8 Commits

Author SHA1 Message Date
14abc446b7 Merge pull request 'add-os-load' (#3) from add-os-load into main
Some checks failed
Test Suite / lint (push) Successful in 25s
Test Suite / test (3.11) (push) Successful in 1m29s
Release / test (push) Successful in 1m2s
Release / build-and-release (push) Failing after 32s
Test Suite / build (push) Successful in 45s
Security Scan / security (push) Successful in 46s
Security Scan / dependency-check (push) Successful in 49s
This adds support for loading data from Opensearch.

Reviewed-on: #3
2025-08-14 19:07:24 -07:00
1b6845774b fix formatting and bump version to v0.3.0
All checks were successful
Security Scan / dependency-check (pull_request) Successful in 44s
Test Suite / lint (pull_request) Successful in 34s
Test Suite / build (pull_request) Successful in 38s
Security Scan / security (pull_request) Successful in 49s
Test Suite / test (3.11) (pull_request) Successful in 1m32s
2025-08-14 19:02:17 -07:00
09e3c86f0a opensearch load improvements
Some checks failed
Security Scan / dependency-check (pull_request) Successful in 44s
Test Suite / lint (pull_request) Failing after 32s
Security Scan / security (pull_request) Successful in 45s
Test Suite / test (3.11) (pull_request) Successful in 1m31s
Test Suite / build (pull_request) Has been skipped
2025-08-14 14:30:52 -07:00
9cf2f0e6fa this will load data from Opensearch.
it doesn't have prompts as well
2025-08-14 13:49:46 -07:00
a2adc8b958 Merge pull request 'fixed refactored code and validated inputs' (#2) from validate-inputs into main
Some checks failed
Security Scan / dependency-check (push) Successful in 34s
Security Scan / security (push) Successful in 40s
Test Suite / lint (push) Successful in 27s
Test Suite / test (3.11) (push) Successful in 1m30s
Release / test (push) Successful in 59s
Release / build-and-release (push) Failing after 36s
Test Suite / build (push) Successful in 46s
Fixed the refactored version, removed app.py, added error feedback on bad input files.

Reviewed-on: #2
2025-08-14 08:11:28 -07:00
4867614474 reformat
All checks were successful
Security Scan / dependency-check (pull_request) Successful in 35s
Security Scan / security (pull_request) Successful in 39s
Test Suite / lint (pull_request) Successful in 30s
Test Suite / test (3.11) (pull_request) Successful in 1m26s
Test Suite / build (pull_request) Successful in 37s
2025-08-14 08:07:50 -07:00
6a995635ac remove upload success alert
Some checks failed
Security Scan / security (pull_request) Successful in 40s
Test Suite / test (3.11) (pull_request) Successful in 1m25s
Test Suite / build (pull_request) Has been skipped
Security Scan / dependency-check (pull_request) Successful in 35s
Test Suite / lint (pull_request) Failing after 26s
2025-08-14 08:00:47 -07:00
7b81c20a26 fixed refactored code
Some checks failed
Security Scan / dependency-check (pull_request) Successful in 38s
Security Scan / security (pull_request) Successful in 41s
Test Suite / lint (pull_request) Failing after 28s
Test Suite / test (3.11) (pull_request) Successful in 1m27s
Test Suite / build (pull_request) Has been skipped
2025-08-14 07:55:40 -07:00
33 changed files with 2518 additions and 547 deletions

View File

@@ -6,6 +6,7 @@
"Bash(uv add:*)" "Bash(uv add:*)"
], ],
"deny": [], "deny": [],
"ask": [] "ask": [],
"defaultMode": "acceptEdits"
} }
} }

View File

@@ -90,7 +90,7 @@ uv run python main.py
The application follows a modular architecture for improved maintainability and testability: The application follows a modular architecture for improved maintainability and testability:
``` ```text
src/embeddingbuddy/ src/embeddingbuddy/
├── config/ # Configuration management ├── config/ # Configuration management
│ └── settings.py # Centralized app settings │ └── settings.py # Centralized app settings
@@ -115,8 +115,8 @@ src/embeddingbuddy/
Run the test suite to verify functionality: Run the test suite to verify functionality:
```bash ```bash
# Install pytest # Install test dependencies
uv add pytest uv sync --extra test
# Run all tests # Run all tests
uv run pytest tests/ -v uv run pytest tests/ -v
@@ -128,6 +128,31 @@ uv run pytest tests/test_data_processing.py -v
uv run pytest tests/ --cov=src/embeddingbuddy uv run pytest tests/ --cov=src/embeddingbuddy
``` ```
### Development Tools
Install development dependencies for linting, type checking, and security:
```bash
# Install all dev dependencies
uv sync --extra dev
# Or install specific groups
uv sync --extra test # Testing tools
uv sync --extra lint # Linting and formatting
uv sync --extra security # Security scanning tools
# Run linting
uv run ruff check src/ tests/
uv run ruff format src/ tests/
# Run type checking
uv run mypy src/embeddingbuddy/
# Run security scans
uv run bandit -r src/
uv run safety check
```
### Adding New Features ### Adding New Features
The modular architecture makes it easy to extend functionality: The modular architecture makes it easy to extend functionality:

515
app.py
View File

@@ -1,515 +0,0 @@
import json
import uuid
from io import StringIO
import base64
import dash
from dash import dcc, html, Input, Output, State, callback
import dash_bootstrap_components as dbc
import plotly.express as px
import plotly.graph_objects as go
import pandas as pd
import numpy as np
from sklearn.decomposition import PCA
import umap
from openTSNE import TSNE
app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP])
def parse_ndjson(contents):
"""Parse NDJSON content and return list of documents."""
content_type, content_string = contents.split(',')
decoded = base64.b64decode(content_string)
text_content = decoded.decode('utf-8')
documents = []
for line in text_content.strip().split('\n'):
if line.strip():
doc = json.loads(line)
if 'id' not in doc:
doc['id'] = str(uuid.uuid4())
documents.append(doc)
return documents
def apply_dimensionality_reduction(embeddings, method='pca', n_components=3):
"""Apply dimensionality reduction to embeddings."""
if method == 'pca':
reducer = PCA(n_components=n_components)
reduced = reducer.fit_transform(embeddings)
variance_explained = reducer.explained_variance_ratio_
return reduced, variance_explained
elif method == 'tsne':
reducer = TSNE(n_components=n_components, random_state=42)
reduced = reducer.fit(embeddings)
return reduced, None
elif method == 'umap':
reducer = umap.UMAP(n_components=n_components, random_state=42)
reduced = reducer.fit_transform(embeddings)
return reduced, None
else:
raise ValueError(f"Unknown method: {method}")
def create_color_mapping(documents, color_by):
"""Create color mapping for documents based on specified field."""
if color_by == 'category':
values = [doc.get('category', 'Unknown') for doc in documents]
elif color_by == 'subcategory':
values = [doc.get('subcategory', 'Unknown') for doc in documents]
elif color_by == 'tags':
values = [', '.join(doc.get('tags', [])) if doc.get('tags') else 'No tags' for doc in documents]
else:
values = ['All'] * len(documents)
return values
def create_plot(df, dimensions='3d', color_by='category', method='PCA'):
"""Create plotly scatter plot."""
color_values = create_color_mapping(df.to_dict('records'), color_by)
# Truncate text for hover display
df_display = df.copy()
df_display['text_preview'] = df_display['text'].apply(lambda x: x[:100] + "..." if len(x) > 100 else x)
# Include all metadata fields in hover
hover_fields = ['id', 'text_preview', 'category', 'subcategory']
# Add tags as a string for hover
df_display['tags_str'] = df_display['tags'].apply(lambda x: ', '.join(x) if x else 'None')
hover_fields.append('tags_str')
if dimensions == '3d':
fig = px.scatter_3d(
df_display, x='dim_1', y='dim_2', z='dim_3',
color=color_values,
hover_data=hover_fields,
title=f'3D Embedding Visualization - {method} (colored by {color_by})'
)
fig.update_traces(marker=dict(size=5))
else:
fig = px.scatter(
df_display, x='dim_1', y='dim_2',
color=color_values,
hover_data=hover_fields,
title=f'2D Embedding Visualization - {method} (colored by {color_by})'
)
fig.update_traces(marker=dict(size=8))
fig.update_layout(
height=None, # Let CSS height control this
autosize=True,
margin=dict(l=0, r=0, t=50, b=0)
)
return fig
def create_dual_plot(doc_df, prompt_df, dimensions='3d', color_by='category', method='PCA', show_prompts=None):
"""Create plotly scatter plot with separate traces for documents and prompts."""
# Create the base figure
fig = go.Figure()
# Helper function to convert colors to grayscale
def to_grayscale_hex(color_str):
"""Convert a color to grayscale while maintaining some distinction."""
import plotly.colors as pc
# Try to get RGB values from the color
try:
if color_str.startswith('#'):
# Hex color
rgb = tuple(int(color_str[i:i+2], 16) for i in (1, 3, 5))
else:
# Named color or other format - convert through plotly
rgb = pc.hex_to_rgb(pc.convert_colors_to_same_type([color_str], colortype='hex')[0][0])
# Convert to grayscale using luminance formula, but keep some color
gray_value = int(0.299 * rgb[0] + 0.587 * rgb[1] + 0.114 * rgb[2])
# Make it a bit more gray but not completely
gray_rgb = (gray_value * 0.7 + rgb[0] * 0.3,
gray_value * 0.7 + rgb[1] * 0.3,
gray_value * 0.7 + rgb[2] * 0.3)
return f'rgb({int(gray_rgb[0])},{int(gray_rgb[1])},{int(gray_rgb[2])})'
except:
return 'rgb(128,128,128)' # fallback gray
# Create document plot using plotly express for consistent colors
doc_color_values = create_color_mapping(doc_df.to_dict('records'), color_by)
doc_df_display = doc_df.copy()
doc_df_display['text_preview'] = doc_df_display['text'].apply(lambda x: x[:100] + "..." if len(x) > 100 else x)
doc_df_display['tags_str'] = doc_df_display['tags'].apply(lambda x: ', '.join(x) if x else 'None')
hover_fields = ['id', 'text_preview', 'category', 'subcategory', 'tags_str']
# Create documents plot to get the color mapping
if dimensions == '3d':
doc_fig = px.scatter_3d(
doc_df_display, x='dim_1', y='dim_2', z='dim_3',
color=doc_color_values,
hover_data=hover_fields
)
else:
doc_fig = px.scatter(
doc_df_display, x='dim_1', y='dim_2',
color=doc_color_values,
hover_data=hover_fields
)
# Add document traces to main figure
for trace in doc_fig.data:
trace.name = f'Documents - {trace.name}'
if dimensions == '3d':
trace.marker.size = 5
trace.marker.symbol = 'circle'
else:
trace.marker.size = 8
trace.marker.symbol = 'circle'
trace.marker.opacity = 1.0
fig.add_trace(trace)
# Add prompt traces if they exist
if prompt_df is not None and show_prompts and 'show' in show_prompts:
prompt_color_values = create_color_mapping(prompt_df.to_dict('records'), color_by)
prompt_df_display = prompt_df.copy()
prompt_df_display['text_preview'] = prompt_df_display['text'].apply(lambda x: x[:100] + "..." if len(x) > 100 else x)
prompt_df_display['tags_str'] = prompt_df_display['tags'].apply(lambda x: ', '.join(x) if x else 'None')
# Create prompts plot to get consistent color grouping
if dimensions == '3d':
prompt_fig = px.scatter_3d(
prompt_df_display, x='dim_1', y='dim_2', z='dim_3',
color=prompt_color_values,
hover_data=hover_fields
)
else:
prompt_fig = px.scatter(
prompt_df_display, x='dim_1', y='dim_2',
color=prompt_color_values,
hover_data=hover_fields
)
# Add prompt traces with grayed colors
for trace in prompt_fig.data:
# Convert the color to grayscale
original_color = trace.marker.color
if hasattr(trace.marker, 'color') and isinstance(trace.marker.color, str):
trace.marker.color = to_grayscale_hex(trace.marker.color)
trace.name = f'Prompts - {trace.name}'
if dimensions == '3d':
trace.marker.size = 6
trace.marker.symbol = 'diamond'
else:
trace.marker.size = 10
trace.marker.symbol = 'diamond'
trace.marker.opacity = 0.8
fig.add_trace(trace)
title = f'{dimensions.upper()} Embedding Visualization - {method} (colored by {color_by})'
fig.update_layout(
title=title,
height=None,
autosize=True,
margin=dict(l=0, r=0, t=50, b=0)
)
return fig
# Layout
app.layout = dbc.Container([
dbc.Row([
dbc.Col([
html.H1("EmbeddingBuddy", className="text-center mb-4"),
], width=12)
]),
dbc.Row([
# Left sidebar with controls
dbc.Col([
html.H5("Upload Data", className="mb-3"),
dcc.Upload(
id='upload-data',
children=html.Div([
'Drag and Drop or ',
html.A('Select Files')
]),
style={
'width': '100%',
'height': '60px',
'lineHeight': '60px',
'borderWidth': '1px',
'borderStyle': 'dashed',
'borderRadius': '5px',
'textAlign': 'center',
'margin-bottom': '20px'
},
multiple=False
),
dcc.Upload(
id='upload-prompts',
children=html.Div([
'Drag and Drop Prompts or ',
html.A('Select Files')
]),
style={
'width': '100%',
'height': '60px',
'lineHeight': '60px',
'borderWidth': '1px',
'borderStyle': 'dashed',
'borderRadius': '5px',
'textAlign': 'center',
'margin-bottom': '20px',
'borderColor': '#28a745'
},
multiple=False
),
dbc.Button(
"Reset All Data",
id='reset-button',
color='danger',
outline=True,
size='sm',
className='mb-3',
style={'width': '100%'}
),
html.H5("Visualization Controls", className="mb-3"),
dbc.Label("Method:"),
dcc.Dropdown(
id='method-dropdown',
options=[
{'label': 'PCA', 'value': 'pca'},
{'label': 't-SNE', 'value': 'tsne'},
{'label': 'UMAP', 'value': 'umap'}
],
value='pca',
style={'margin-bottom': '15px'}
),
dbc.Label("Color by:"),
dcc.Dropdown(
id='color-dropdown',
options=[
{'label': 'Category', 'value': 'category'},
{'label': 'Subcategory', 'value': 'subcategory'},
{'label': 'Tags', 'value': 'tags'}
],
value='category',
style={'margin-bottom': '15px'}
),
dbc.Label("Dimensions:"),
dcc.RadioItems(
id='dimension-toggle',
options=[
{'label': '2D', 'value': '2d'},
{'label': '3D', 'value': '3d'}
],
value='3d',
style={'margin-bottom': '20px'}
),
dbc.Label("Show Prompts:"),
dcc.Checklist(
id='show-prompts-toggle',
options=[{'label': 'Show prompts on plot', 'value': 'show'}],
value=['show'],
style={'margin-bottom': '20px'}
),
html.H5("Point Details", className="mb-3"),
html.Div(id='point-details', children="Click on a point to see details")
], width=3, style={'padding-right': '20px'}),
# Main visualization area
dbc.Col([
dcc.Graph(
id='embedding-plot',
style={'height': '85vh', 'width': '100%'},
config={'responsive': True, 'displayModeBar': True}
)
], width=9)
]),
dcc.Store(id='processed-data'),
dcc.Store(id='processed-prompts')
], fluid=True)
@callback(
Output('processed-data', 'data'),
Input('upload-data', 'contents'),
State('upload-data', 'filename')
)
def process_uploaded_file(contents, filename):
if contents is None:
return None
try:
documents = parse_ndjson(contents)
embeddings = np.array([doc['embedding'] for doc in documents])
# Store original embeddings and documents
return {
'documents': documents,
'embeddings': embeddings.tolist()
}
except Exception as e:
return {'error': str(e)}
@callback(
Output('processed-prompts', 'data'),
Input('upload-prompts', 'contents'),
State('upload-prompts', 'filename')
)
def process_uploaded_prompts(contents, filename):
if contents is None:
return None
try:
prompts = parse_ndjson(contents)
embeddings = np.array([prompt['embedding'] for prompt in prompts])
# Store original embeddings and prompts
return {
'prompts': prompts,
'embeddings': embeddings.tolist()
}
except Exception as e:
return {'error': str(e)}
@callback(
Output('embedding-plot', 'figure'),
[Input('processed-data', 'data'),
Input('processed-prompts', 'data'),
Input('method-dropdown', 'value'),
Input('color-dropdown', 'value'),
Input('dimension-toggle', 'value'),
Input('show-prompts-toggle', 'value')]
)
def update_plot(data, prompts_data, method, color_by, dimensions, show_prompts):
if not data or 'error' in data:
return go.Figure().add_annotation(
text="Upload a valid NDJSON file to see visualization",
xref="paper", yref="paper",
x=0.5, y=0.5, xanchor='center', yanchor='middle',
showarrow=False, font=dict(size=16)
)
# Prepare embeddings for dimensionality reduction
doc_embeddings = np.array(data['embeddings'])
all_embeddings = doc_embeddings
has_prompts = prompts_data and 'error' not in prompts_data and prompts_data.get('prompts')
if has_prompts:
prompt_embeddings = np.array(prompts_data['embeddings'])
all_embeddings = np.vstack([doc_embeddings, prompt_embeddings])
n_components = 3 if dimensions == '3d' else 2
# Apply dimensionality reduction to combined data
reduced, variance_explained = apply_dimensionality_reduction(
all_embeddings, method=method, n_components=n_components
)
# Split reduced embeddings back
doc_reduced = reduced[:len(doc_embeddings)]
prompt_reduced = reduced[len(doc_embeddings):] if has_prompts else None
# Create dataframes
doc_df_data = []
for i, doc in enumerate(data['documents']):
row = {
'id': doc['id'],
'text': doc['text'],
'category': doc.get('category', 'Unknown'),
'subcategory': doc.get('subcategory', 'Unknown'),
'tags': doc.get('tags', []),
'dim_1': doc_reduced[i, 0],
'dim_2': doc_reduced[i, 1],
'type': 'document'
}
if dimensions == '3d':
row['dim_3'] = doc_reduced[i, 2]
doc_df_data.append(row)
doc_df = pd.DataFrame(doc_df_data)
prompt_df = None
if has_prompts and prompt_reduced is not None:
prompt_df_data = []
for i, prompt in enumerate(prompts_data['prompts']):
row = {
'id': prompt['id'],
'text': prompt['text'],
'category': prompt.get('category', 'Unknown'),
'subcategory': prompt.get('subcategory', 'Unknown'),
'tags': prompt.get('tags', []),
'dim_1': prompt_reduced[i, 0],
'dim_2': prompt_reduced[i, 1],
'type': 'prompt'
}
if dimensions == '3d':
row['dim_3'] = prompt_reduced[i, 2]
prompt_df_data.append(row)
prompt_df = pd.DataFrame(prompt_df_data)
return create_dual_plot(doc_df, prompt_df, dimensions, color_by, method.upper(), show_prompts)
@callback(
Output('point-details', 'children'),
Input('embedding-plot', 'clickData'),
[State('processed-data', 'data'),
State('processed-prompts', 'data')]
)
def display_click_data(clickData, data, prompts_data):
if not clickData or not data:
return "Click on a point to see details"
# Get point info from click
point_data = clickData['points'][0]
trace_name = point_data.get('fullData', {}).get('name', 'Documents')
if 'pointIndex' in point_data:
point_index = point_data['pointIndex']
elif 'pointNumber' in point_data:
point_index = point_data['pointNumber']
else:
return "Could not identify clicked point"
# Determine which dataset this point belongs to
if trace_name == 'Prompts' and prompts_data and 'prompts' in prompts_data:
item = prompts_data['prompts'][point_index]
item_type = 'Prompt'
else:
item = data['documents'][point_index]
item_type = 'Document'
return dbc.Card([
dbc.CardBody([
html.H5(f"{item_type}: {item['id']}", className="card-title"),
html.P(f"Text: {item['text']}", className="card-text"),
html.P(f"Category: {item.get('category', 'Unknown')}", className="card-text"),
html.P(f"Subcategory: {item.get('subcategory', 'Unknown')}", className="card-text"),
html.P(f"Tags: {', '.join(item.get('tags', [])) if item.get('tags') else 'None'}", className="card-text"),
html.P(f"Type: {item_type}", className="card-text text-muted")
])
])
@callback(
[Output('processed-data', 'data', allow_duplicate=True),
Output('processed-prompts', 'data', allow_duplicate=True),
Output('point-details', 'children', allow_duplicate=True)],
Input('reset-button', 'n_clicks'),
prevent_initial_call=True
)
def reset_data(n_clicks):
if n_clicks is None or n_clicks == 0:
return dash.no_update, dash.no_update, dash.no_update
return None, None, "Click on a point to see details"
if __name__ == '__main__':
app.run(debug=True)

View File

@@ -0,0 +1,157 @@
# Elasticsearch/OpenSearch Sample Data
This directory contains sample data files in Elasticsearch bulk index format for testing the OpenSearch integration in EmbeddingBuddy.
## Files
### Original NDJSON Files
- `sample_data.ndjson` - Original sample documents in EmbeddingBuddy format
- `sample_prompts.ndjson` - Original sample prompts in EmbeddingBuddy format
### Elasticsearch Bulk Files
- `sample_data_es_bulk.ndjson` - Documents in ES bulk format (index: "embeddings")
- `sample_prompts_es_bulk.ndjson` - Prompts in ES bulk format (index: "prompts")
## Usage
### 1. Index the data using curl
```bash
# Index main documents
curl -X POST "localhost:9200/_bulk" \
-H "Content-Type: application/x-ndjson" \
--data-binary @sample_data_es_bulk.ndjson
# Index prompts
curl -X POST "localhost:9200/_bulk" \
-H "Content-Type: application/x-ndjson" \
--data-binary @sample_prompts_es_bulk.ndjson
```
### 2. Create proper mappings (recommended)
First create the index with proper dense_vector mapping:
```bash
# Create embeddings index with dense_vector mapping
curl -X PUT "localhost:9200/embeddings" \
-H "Content-Type: application/json" \
-d '{
"settings": {
"index.knn": true
},
"mappings": {
"properties": {
"id": {"type": "keyword"},
"embedding": {
"type": "knn_vector",
"dimension": 8,
"method": {
"engine": "lucene",
"space_type": "cosinesimil",
"name": "hnsw",
"parameters": {}
}
},
"text": {"type": "text"},
"category": {"type": "keyword"},
"subcategory": {"type": "keyword"},
"tags": {"type": "keyword"}
}
}
}'
# Create dense vector index with alternative field names
curl -X PUT "localhost:9200/prompts" \
-H "Content-Type: application/json" \
-d '{
"settings": {
"index.knn": true
},
"mappings": {
"properties": {
"id": {"type": "keyword"},
"embedding": {
"type": "knn_vector",
"dimension": 8,
"method": {
"engine": "lucene",
"space_type": "cosinesimil",
"name": "hnsw",
"parameters": {}
}
},
"text": {"type": "text"},
"category": {"type": "keyword"},
"subcategory": {"type": "keyword"},
"tags": {"type": "keyword"}
}
}
}'
```
Then index the data using the bulk files above.
### 3. Test in EmbeddingBuddy
#### For "embeddings" index
- **OpenSearch URL**: `http://localhost:9200`
- **Index Name**: `embeddings`
- **Field Mapping**:
- Embedding Field: `embedding`
- Text Field: `text`
- ID Field: `id`
- Category Field: `category`
- Subcategory Field: `subcategory`
- Tags Field: `tags`
#### For "embeddings-dense" index (alternative field names)
- **OpenSearch URL**: `http://localhost:9200`
- **Index Name**: `embeddings-dense`
- **Field Mapping**:
- Embedding Field: `vector`
- Text Field: `content`
- ID Field: `doc_id`
- Category Field: `type`
- Subcategory Field: `subtopic`
- Tags Field: `keywords`
## Data Structure
### Original Format (from NDJSON files)
```json
{
"id": "doc_001",
"embedding": [0.2, -0.1, 0.8, 0.3, -0.5, 0.7, 0.1, -0.3],
"text": "Machine learning algorithms are transforming healthcare...",
"category": "technology",
"subcategory": "healthcare",
"tags": ["ai", "medicine", "prediction"]
}
```
### ES Bulk Format
```json
{"index": {"_index": "embeddings", "_id": "doc_001"}}
{"id": "doc_001", "embedding": [...], "text": "...", "category": "...", ...}
```
### Alternative Field Names (dense vector format)
```json
{"index": {"_index": "embeddings-dense", "_id": "doc_001"}}
{"doc_id": "doc_001", "vector": [...], "content": "...", "type": "...", ...}
```
## Notes
- All embedding vectors are 8-dimensional for these sample files
- The alternative format demonstrates how EmbeddingBuddy's field mapping handles different field names
- For production use, you may want larger embedding dimensions (e.g., 384, 768, 1536)
- The `dense_vector` field type in Elasticsearch/OpenSearch enables vector similarity search

View File

@@ -0,0 +1,2 @@
<EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>{"id": "doc_001", "embedding": [0.1, -0.3, 0.7, 0.2], "text": "Binary junk at start"}
{"id": "doc_002", "embedding": [0.5, 0.1, -0.2, 0.8], "text": "Normal line"}<7D><><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD><EFBFBD>

View File

@@ -0,0 +1,6 @@
{"id": "doc_001", "embedding": [0.1, -0.3, 0.7, 0.2], "text": "First line"}
{"id": "doc_002", "embedding": [0.5, 0.1, -0.2, 0.8], "text": "After empty line"}
{"id": "doc_003", "embedding": [0.3, 0.4, 0.1, -0.1], "text": "After multiple empty lines"}

View File

@@ -0,0 +1,4 @@
{"id": "doc_001", "embedding": [0.1, -0.3, 0.7, 0.2], "text": "4D embedding"}
{"id": "doc_002", "embedding": [0.5, 0.1, -0.2], "text": "3D embedding"}
{"id": "doc_003", "embedding": [0.3, 0.4, 0.1, -0.1, 0.8], "text": "5D embedding"}
{"id": "doc_004", "embedding": [0.2, 0.1], "text": "2D embedding"}

View File

@@ -0,0 +1,8 @@
{"id": "doc_001", "embedding": "not_an_array", "text": "Embedding as string"}
{"id": "doc_002", "embedding": [0.1, "text", 0.7, 0.2], "text": "Mixed types in embedding"}
{"id": "doc_003", "embedding": [], "text": "Empty embedding array"}
{"id": "doc_004", "embedding": [0.1], "text": "Single dimension embedding"}
{"id": "doc_005", "embedding": null, "text": "Null embedding"}
{"id": "doc_006", "embedding": [0.1, 0.2, null, 0.4], "text": "Null value in embedding"}
{"id": "doc_007", "embedding": [0.1, 0.2, "NaN", 0.4], "text": "String NaN in embedding"}
{"id": "doc_008", "embedding": [0.1, 0.2, Infinity, 0.4], "text": "Infinity in embedding"}

View File

@@ -0,0 +1,5 @@
{"id": "doc_001", "embedding": [0.1, -0.3, 0.7, "text": "Valid line"}
{"id": "doc_002", "embedding": [0.5, 0.1, -0.2, 0.8], "text": "Missing closing brace"
{"id": "doc_003" "embedding": [0.3, 0.4, 0.1, -0.1], "text": "Missing colon after id"}
{id: "doc_004", "embedding": [0.2, 0.1, 0.3, 0.4], "text": "Unquoted key"}
{"id": "doc_005", "embedding": [0.1, 0.2, 0.3, 0.4], "text": "Valid line again"}

View File

@@ -0,0 +1,3 @@
{"id": "doc_001", "text": "Sample text without embedding field", "category": "test"}
{"id": "doc_002", "text": "Another text without embedding", "category": "test"}
{"id": "doc_003", "text": "Third text missing embedding", "category": "test"}

View File

@@ -0,0 +1,3 @@
{"id": "doc_001", "embedding": [0.1, -0.3, 0.7, 0.2], "category": "test"}
{"id": "doc_002", "embedding": [0.5, 0.1, -0.2, 0.8], "category": "test"}
{"id": "doc_003", "embedding": [0.3, 0.4, 0.1, -0.1], "category": "test"}

View File

@@ -0,0 +1,4 @@
[
{"id": "doc_001", "embedding": [0.1, -0.3, 0.7, 0.2], "text": "Regular JSON array"},
{"id": "doc_002", "embedding": [0.5, 0.1, -0.2, 0.8], "text": "Instead of NDJSON"}
]

View File

@@ -0,0 +1,40 @@
{"index": {"_index": "embeddings", "_id": "doc_001"}}
{"id": "doc_001", "embedding": [0.2, -0.1, 0.8, 0.3, -0.5, 0.7, 0.1, -0.3], "text": "Machine learning algorithms are transforming healthcare by enabling predictive analytics and personalized medicine.", "category": "technology", "subcategory": "healthcare", "tags": ["ai", "medicine", "prediction"]}
{"index": {"_index": "embeddings", "_id": "doc_002"}}
{"id": "doc_002", "embedding": [0.1, 0.4, -0.2, 0.6, 0.3, -0.4, 0.8, 0.2], "text": "Climate change poses significant challenges to global food security and agricultural sustainability.", "category": "environment", "subcategory": "agriculture", "tags": ["climate", "food", "sustainability"]}
{"index": {"_index": "embeddings", "_id": "doc_003"}}
{"id": "doc_003", "embedding": [-0.3, 0.7, 0.1, -0.2, 0.9, 0.4, -0.1, 0.5], "text": "The rise of electric vehicles is reshaping the automotive industry and urban transportation systems.", "category": "technology", "subcategory": "automotive", "tags": ["electric", "transport", "urban"]}
{"index": {"_index": "embeddings", "_id": "doc_004"}}
{"id": "doc_004", "embedding": [0.5, -0.6, 0.3, 0.8, -0.2, 0.1, 0.7, -0.4], "text": "Renewable energy sources like solar and wind are becoming increasingly cost-competitive with fossil fuels.", "category": "environment", "subcategory": "energy", "tags": ["renewable", "solar", "wind"]}
{"index": {"_index": "embeddings", "_id": "doc_005"}}
{"id": "doc_005", "embedding": [0.8, 0.2, -0.5, 0.1, 0.6, -0.3, 0.4, 0.9], "text": "Financial markets are experiencing volatility due to geopolitical tensions and inflation concerns.", "category": "finance", "subcategory": "markets", "tags": ["volatility", "inflation", "geopolitics"]}
{"index": {"_index": "embeddings", "_id": "doc_006"}}
{"id": "doc_006", "embedding": [-0.1, 0.5, 0.7, -0.4, 0.2, 0.8, -0.6, 0.3], "text": "Quantum computing research is advancing rapidly with potential applications in cryptography and drug discovery.", "category": "technology", "subcategory": "research", "tags": ["quantum", "cryptography", "research"]}
{"index": {"_index": "embeddings", "_id": "doc_007"}}
{"id": "doc_007", "embedding": [0.4, -0.3, 0.6, 0.7, -0.8, 0.2, 0.5, -0.1], "text": "Ocean pollution from plastic waste is threatening marine ecosystems and biodiversity worldwide.", "category": "environment", "subcategory": "marine", "tags": ["pollution", "plastic", "marine"]}
{"index": {"_index": "embeddings", "_id": "doc_008"}}
{"id": "doc_008", "embedding": [0.3, 0.8, -0.2, 0.5, 0.1, -0.7, 0.6, 0.4], "text": "Artificial intelligence is revolutionizing customer service through chatbots and automated support systems.", "category": "technology", "subcategory": "customer_service", "tags": ["ai", "chatbots", "automation"]}
{"index": {"_index": "embeddings", "_id": "doc_009"}}
{"id": "doc_009", "embedding": [-0.5, 0.3, 0.9, -0.1, 0.7, 0.4, -0.2, 0.8], "text": "Global supply chains are being redesigned for resilience after pandemic-related disruptions.", "category": "business", "subcategory": "logistics", "tags": ["supply_chain", "pandemic", "resilience"]}
{"index": {"_index": "embeddings", "_id": "doc_010"}}
{"id": "doc_010", "embedding": [0.7, -0.4, 0.2, 0.9, -0.3, 0.6, 0.1, -0.8], "text": "Space exploration missions are expanding our understanding of the solar system and potential for life.", "category": "science", "subcategory": "space", "tags": ["space", "exploration", "life"]}
{"index": {"_index": "embeddings", "_id": "doc_011"}}
{"id": "doc_011", "embedding": [-0.2, 0.6, 0.4, -0.7, 0.8, 0.3, -0.5, 0.1], "text": "Cryptocurrency adoption is growing among institutional investors despite regulatory uncertainties.", "category": "finance", "subcategory": "crypto", "tags": ["cryptocurrency", "institutional", "regulation"]}
{"index": {"_index": "embeddings", "_id": "doc_012"}}
{"id": "doc_012", "embedding": [0.6, 0.1, -0.8, 0.4, 0.5, -0.2, 0.9, -0.3], "text": "Remote work technologies are transforming traditional office environments and work-life balance.", "category": "technology", "subcategory": "workplace", "tags": ["remote", "work", "balance"]}
{"index": {"_index": "embeddings", "_id": "doc_013"}}
{"id": "doc_013", "embedding": [0.1, -0.7, 0.5, 0.8, -0.4, 0.3, 0.2, 0.6], "text": "Gene therapy breakthroughs are offering new hope for treating previously incurable genetic diseases.", "category": "science", "subcategory": "medicine", "tags": ["gene_therapy", "genetics", "medicine"]}
{"index": {"_index": "embeddings", "_id": "doc_014"}}
{"id": "doc_014", "embedding": [-0.4, 0.2, 0.7, -0.1, 0.9, -0.6, 0.3, 0.5], "text": "Urban planning is evolving to create more sustainable and livable cities for growing populations.", "category": "environment", "subcategory": "urban", "tags": ["urban_planning", "sustainability", "cities"]}
{"index": {"_index": "embeddings", "_id": "doc_015"}}
{"id": "doc_015", "embedding": [0.9, -0.1, 0.3, 0.6, -0.5, 0.8, -0.2, 0.4], "text": "Social media platforms are implementing new policies to combat misinformation and protect user privacy.", "category": "technology", "subcategory": "social_media", "tags": ["social_media", "misinformation", "privacy"]}
{"index": {"_index": "embeddings", "_id": "doc_016"}}
{"id": "doc_016", "embedding": [-0.3, 0.8, -0.1, 0.4, 0.7, -0.5, 0.6, -0.9], "text": "Educational technology is personalizing learning experiences and improving student outcomes.", "category": "education", "subcategory": "technology", "tags": ["education", "personalization", "technology"]}
{"index": {"_index": "embeddings", "_id": "doc_017"}}
{"id": "doc_017", "embedding": [0.5, 0.3, -0.6, 0.2, 0.8, 0.1, -0.4, 0.7], "text": "Biodiversity conservation efforts are critical for maintaining ecosystem balance and preventing species extinction.", "category": "environment", "subcategory": "conservation", "tags": ["biodiversity", "conservation", "extinction"]}
{"index": {"_index": "embeddings", "_id": "doc_018"}}
{"id": "doc_018", "embedding": [0.2, -0.8, 0.4, 0.7, -0.1, 0.5, 0.9, -0.3], "text": "Healthcare systems are adopting telemedicine to improve access and reduce costs for patients.", "category": "technology", "subcategory": "healthcare", "tags": ["telemedicine", "healthcare", "access"]}
{"index": {"_index": "embeddings", "_id": "doc_019"}}
{"id": "doc_019", "embedding": [-0.7, 0.4, 0.8, -0.2, 0.3, 0.6, -0.1, 0.9], "text": "Autonomous vehicles are being tested extensively with promises of safer and more efficient transportation.", "category": "technology", "subcategory": "automotive", "tags": ["autonomous", "safety", "efficiency"]}
{"index": {"_index": "embeddings", "_id": "doc_020"}}
{"id": "doc_020", "embedding": [0.4, 0.7, -0.3, 0.9, -0.6, 0.2, 0.5, -0.1], "text": "Mental health awareness is increasing with new approaches to therapy and workplace wellness programs.", "category": "health", "subcategory": "mental", "tags": ["mental_health", "therapy", "wellness"]}

View File

@@ -0,0 +1,20 @@
{"index": {"_index": "prompts", "_id": "prompt_001"}}
{"id": "prompt_001", "embedding": [0.15, -0.28, 0.65, 0.42, -0.11, 0.33, 0.78, -0.52], "text": "Find articles about machine learning applications", "category": "search", "subcategory": "technology", "tags": ["AI", "research"]}
{"index": {"_index": "prompts", "_id": "prompt_002"}}
{"id": "prompt_002", "embedding": [0.72, 0.18, -0.35, 0.51, 0.09, -0.44, 0.27, 0.63], "text": "Show me product reviews for smartphones", "category": "search", "subcategory": "product", "tags": ["mobile", "reviews"]}
{"index": {"_index": "prompts", "_id": "prompt_003"}}
{"id": "prompt_003", "embedding": [-0.21, 0.59, 0.34, -0.67, 0.45, 0.12, -0.38, 0.76], "text": "What are the latest political developments?", "category": "search", "subcategory": "news", "tags": ["politics", "current events"]}
{"index": {"_index": "prompts", "_id": "prompt_004"}}
{"id": "prompt_004", "embedding": [0.48, -0.15, 0.72, 0.31, -0.58, 0.24, 0.67, -0.39], "text": "Summarize recent tech industry trends", "category": "analysis", "subcategory": "technology", "tags": ["tech", "trends", "summary"]}
{"index": {"_index": "prompts", "_id": "prompt_005"}}
{"id": "prompt_005", "embedding": [-0.33, 0.47, -0.62, 0.28, 0.71, -0.18, 0.54, 0.35], "text": "Compare different smartphone models", "category": "analysis", "subcategory": "product", "tags": ["comparison", "mobile", "evaluation"]}
{"index": {"_index": "prompts", "_id": "prompt_006"}}
{"id": "prompt_006", "embedding": [0.64, 0.21, 0.39, -0.45, 0.13, 0.58, -0.27, 0.74], "text": "Analyze voter sentiment on recent policies", "category": "analysis", "subcategory": "politics", "tags": ["sentiment", "politics", "analysis"]}
{"index": {"_index": "prompts", "_id": "prompt_007"}}
{"id": "prompt_007", "embedding": [0.29, -0.43, 0.56, 0.68, -0.22, 0.37, 0.14, -0.61], "text": "Generate a summary of machine learning research", "category": "generation", "subcategory": "technology", "tags": ["AI", "research", "summary"]}
{"index": {"_index": "prompts", "_id": "prompt_008"}}
{"id": "prompt_008", "embedding": [-0.17, 0.52, -0.48, 0.36, 0.74, -0.29, 0.61, 0.18], "text": "Create a product recommendation report", "category": "generation", "subcategory": "product", "tags": ["recommendation", "report", "analysis"]}
{"index": {"_index": "prompts", "_id": "prompt_009"}}
{"id": "prompt_009", "embedding": [0.55, 0.08, 0.41, -0.37, 0.26, 0.69, -0.14, 0.58], "text": "Write a news brief on election updates", "category": "generation", "subcategory": "news", "tags": ["election", "news", "brief"]}
{"index": {"_index": "prompts", "_id": "prompt_010"}}
{"id": "prompt_010", "embedding": [0.23, -0.59, 0.47, 0.61, -0.35, 0.18, 0.72, -0.26], "text": "Explain how neural networks work", "category": "explanation", "subcategory": "technology", "tags": ["AI", "education", "neural networks"]}

View File

@@ -1,6 +1,6 @@
[project] [project]
name = "embeddingbuddy" name = "embeddingbuddy"
version = "0.2.0" version = "0.3.0"
description = "A Python Dash application for interactive exploration and visualization of embedding vectors through dimensionality reduction techniques." description = "A Python Dash application for interactive exploration and visualization of embedding vectors through dimensionality reduction techniques."
readme = "README.md" readme = "README.md"
requires-python = ">=3.11" requires-python = ">=3.11"
@@ -15,6 +15,7 @@ dependencies = [
"numba>=0.56.4", "numba>=0.56.4",
"openTSNE>=1.0.0", "openTSNE>=1.0.0",
"mypy>=1.17.1", "mypy>=1.17.1",
"opensearch-py>=3.0.0",
] ]
[project.optional-dependencies] [project.optional-dependencies]

View File

@@ -10,6 +10,9 @@ from .ui.callbacks.interactions import InteractionCallbacks
def create_app(): def create_app():
app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP]) app = dash.Dash(__name__, external_stylesheets=[dbc.themes.BOOTSTRAP])
# Allow callbacks to components that are dynamically created in tabs
app.config.suppress_callback_exceptions = True
layout_manager = AppLayout() layout_manager = AppLayout()
app.layout = layout_manager.create_layout() app.layout = layout_manager.create_layout()

View File

@@ -73,6 +73,12 @@ class AppSettings:
HOST = os.getenv("EMBEDDINGBUDDY_HOST", "127.0.0.1") HOST = os.getenv("EMBEDDINGBUDDY_HOST", "127.0.0.1")
PORT = int(os.getenv("EMBEDDINGBUDDY_PORT", "8050")) PORT = int(os.getenv("EMBEDDINGBUDDY_PORT", "8050"))
# OpenSearch Configuration
OPENSEARCH_DEFAULT_SIZE = 100
OPENSEARCH_SAMPLE_SIZE = 5
OPENSEARCH_CONNECTION_TIMEOUT = 30
OPENSEARCH_VERIFY_CERTS = True
# Bootstrap Theme # Bootstrap Theme
EXTERNAL_STYLESHEETS = [ EXTERNAL_STYLESHEETS = [
"https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css" "https://cdn.jsdelivr.net/npm/bootstrap@5.1.3/dist/css/bootstrap.min.css"

View File

@@ -16,11 +16,22 @@ class NDJSONParser:
@staticmethod @staticmethod
def parse_text(text_content: str) -> List[Document]: def parse_text(text_content: str) -> List[Document]:
documents = [] documents = []
for line in text_content.strip().split("\n"): for line_num, line in enumerate(text_content.strip().split("\n"), 1):
if line.strip(): if line.strip():
doc_dict = json.loads(line) try:
doc = NDJSONParser._dict_to_document(doc_dict) doc_dict = json.loads(line)
documents.append(doc) doc = NDJSONParser._dict_to_document(doc_dict)
documents.append(doc)
except json.JSONDecodeError as e:
raise json.JSONDecodeError(
f"Invalid JSON on line {line_num}: {e.msg}", e.doc, e.pos
)
except KeyError as e:
raise KeyError(f"Missing required field {e} on line {line_num}")
except (TypeError, ValueError) as e:
raise ValueError(
f"Invalid data format on line {line_num}: {str(e)}"
)
return documents return documents
@staticmethod @staticmethod
@@ -28,10 +39,33 @@ class NDJSONParser:
if "id" not in doc_dict: if "id" not in doc_dict:
doc_dict["id"] = str(uuid.uuid4()) doc_dict["id"] = str(uuid.uuid4())
# Validate required fields
if "text" not in doc_dict:
raise KeyError("'text'")
if "embedding" not in doc_dict:
raise KeyError("'embedding'")
# Validate embedding format
embedding = doc_dict["embedding"]
if not isinstance(embedding, list):
raise ValueError(
f"Embedding must be a list, got {type(embedding).__name__}"
)
if not embedding:
raise ValueError("Embedding cannot be empty")
# Check that all embedding values are numbers
for i, val in enumerate(embedding):
if not isinstance(val, (int, float)) or val != val: # NaN check
raise ValueError(
f"Embedding contains invalid value at index {i}: {val}"
)
return Document( return Document(
id=doc_dict["id"], id=doc_dict["id"],
text=doc_dict["text"], text=doc_dict["text"],
embedding=doc_dict["embedding"], embedding=embedding,
category=doc_dict.get("category"), category=doc_dict.get("category"),
subcategory=doc_dict.get("subcategory"), subcategory=doc_dict.get("subcategory"),
tags=doc_dict.get("tags"), tags=doc_dict.get("tags"),

View File

@@ -1,6 +1,7 @@
import numpy as np import numpy as np
from typing import List, Optional, Tuple from typing import List, Optional, Tuple
from ..models.schemas import Document, ProcessedData from ..models.schemas import Document, ProcessedData
from ..models.field_mapper import FieldMapper
from .parser import NDJSONParser from .parser import NDJSONParser
@@ -26,6 +27,42 @@ class DataProcessor:
except Exception as e: except Exception as e:
return ProcessedData(documents=[], embeddings=np.array([]), error=str(e)) return ProcessedData(documents=[], embeddings=np.array([]), error=str(e))
def process_opensearch_data(
self, raw_documents: List[dict], field_mapping
) -> ProcessedData:
"""Process raw OpenSearch documents using field mapping."""
try:
# Transform documents using field mapping
transformed_docs = FieldMapper.transform_documents(
raw_documents, field_mapping
)
# Parse transformed documents
documents = []
for doc_dict in transformed_docs:
try:
# Ensure required fields are present with defaults if needed
if "id" not in doc_dict or not doc_dict["id"]:
doc_dict["id"] = f"doc_{len(documents)}"
doc = Document(**doc_dict)
documents.append(doc)
except Exception:
continue # Skip invalid documents
if not documents:
return ProcessedData(
documents=[],
embeddings=np.array([]),
error="No valid documents after transformation",
)
embeddings = self._extract_embeddings(documents)
return ProcessedData(documents=documents, embeddings=embeddings)
except Exception as e:
return ProcessedData(documents=[], embeddings=np.array([]), error=str(e))
def _extract_embeddings(self, documents: List[Document]) -> np.ndarray: def _extract_embeddings(self, documents: List[Document]) -> np.ndarray:
if not documents: if not documents:
return np.array([]) return np.array([])

View File

@@ -0,0 +1,189 @@
from typing import Dict, List, Optional, Any, Tuple
import logging
from opensearchpy import OpenSearch
from opensearchpy.exceptions import OpenSearchException
logger = logging.getLogger(__name__)
class OpenSearchClient:
def __init__(self):
self.client: Optional[OpenSearch] = None
self.connection_info: Optional[Dict[str, Any]] = None
def connect(
self,
url: str,
username: Optional[str] = None,
password: Optional[str] = None,
api_key: Optional[str] = None,
verify_certs: bool = True,
) -> Tuple[bool, str]:
"""
Connect to OpenSearch instance.
Returns:
Tuple of (success: bool, message: str)
"""
try:
# Parse URL to extract host and port
if url.startswith("http://") or url.startswith("https://"):
host = url
else:
host = f"https://{url}"
# Build auth configuration
auth_config = {}
if username and password:
auth_config["http_auth"] = (username, password)
elif api_key:
auth_config["api_key"] = api_key
# Create client
self.client = OpenSearch([host], verify_certs=verify_certs, **auth_config)
# Test connection
info = self.client.info()
self.connection_info = {
"url": host,
"cluster_name": info.get("cluster_name", "Unknown"),
"version": info.get("version", {}).get("number", "Unknown"),
}
return (
True,
f"Connected to {info.get('cluster_name', 'OpenSearch cluster')}",
)
except OpenSearchException as e:
logger.error(f"OpenSearch connection error: {e}")
return False, f"Connection failed: {str(e)}"
except Exception as e:
logger.error(f"Unexpected error connecting to OpenSearch: {e}")
return False, f"Unexpected error: {str(e)}"
def get_index_mapping(self, index_name: str) -> Tuple[bool, Optional[Dict], str]:
"""
Get the mapping for a specific index.
Returns:
Tuple of (success: bool, mapping: Dict or None, message: str)
"""
if not self.client:
return False, None, "Not connected to OpenSearch"
try:
mapping = self.client.indices.get_mapping(index=index_name)
return True, mapping, "Mapping retrieved successfully"
except OpenSearchException as e:
logger.error(f"Error getting mapping for index {index_name}: {e}")
return False, None, f"Failed to get mapping: {str(e)}"
def analyze_fields(self, index_name: str) -> Tuple[bool, Optional[Dict], str]:
"""
Analyze index fields to detect potential embedding and text fields.
Returns:
Tuple of (success: bool, analysis: Dict or None, message: str)
"""
success, mapping, message = self.get_index_mapping(index_name)
if not success:
return False, None, message
try:
# Extract field information from mapping
index_mapping = mapping[index_name]["mappings"]["properties"]
analysis = {
"vector_fields": [],
"text_fields": [],
"keyword_fields": [],
"numeric_fields": [],
"all_fields": [],
}
for field_name, field_info in index_mapping.items():
field_type = field_info.get("type", "unknown")
analysis["all_fields"].append(field_name)
if field_type == "dense_vector":
analysis["vector_fields"].append(
{
"name": field_name,
"dimension": field_info.get("dimension", "unknown"),
}
)
elif field_type == "text":
analysis["text_fields"].append(field_name)
elif field_type == "keyword":
analysis["keyword_fields"].append(field_name)
elif field_type in ["integer", "long", "float", "double"]:
analysis["numeric_fields"].append(field_name)
return True, analysis, "Field analysis completed"
except Exception as e:
logger.error(f"Error analyzing fields: {e}")
return False, None, f"Field analysis failed: {str(e)}"
def fetch_sample_data(
self, index_name: str, size: int = 5
) -> Tuple[bool, List[Dict], str]:
"""
Fetch sample documents from the index.
Returns:
Tuple of (success: bool, documents: List[Dict], message: str)
"""
if not self.client:
return False, [], "Not connected to OpenSearch"
try:
response = self.client.search(
index=index_name, body={"query": {"match_all": {}}, "size": size}
)
documents = [hit["_source"] for hit in response["hits"]["hits"]]
return True, documents, f"Retrieved {len(documents)} sample documents"
except OpenSearchException as e:
logger.error(f"Error fetching sample data: {e}")
return False, [], f"Failed to fetch sample data: {str(e)}"
def fetch_data(
self, index_name: str, size: int = 100
) -> Tuple[bool, List[Dict], str]:
"""
Fetch documents from the index.
Returns:
Tuple of (success: bool, documents: List[Dict], message: str)
"""
if not self.client:
return False, [], "Not connected to OpenSearch"
try:
response = self.client.search(
index=index_name, body={"query": {"match_all": {}}, "size": size}
)
documents = [hit["_source"] for hit in response["hits"]["hits"]]
total_hits = response["hits"]["total"]["value"]
message = f"Retrieved {len(documents)} documents from {total_hits} total"
return True, documents, message
except OpenSearchException as e:
logger.error(f"Error fetching data: {e}")
return False, [], f"Failed to fetch data: {str(e)}"
def disconnect(self):
"""Disconnect from OpenSearch."""
if self.client:
self.client = None
self.connection_info = None
def is_connected(self) -> bool:
"""Check if connected to OpenSearch."""
return self.client is not None

View File

@@ -0,0 +1,254 @@
from dataclasses import dataclass
from typing import Dict, List, Optional, Any
import logging
logger = logging.getLogger(__name__)
@dataclass
class FieldMapping:
"""Configuration for mapping OpenSearch fields to standard format."""
embedding_field: str
text_field: str
id_field: Optional[str] = None
category_field: Optional[str] = None
subcategory_field: Optional[str] = None
tags_field: Optional[str] = None
class FieldMapper:
"""Handles field mapping and data transformation from OpenSearch to standard format."""
@staticmethod
def suggest_mappings(field_analysis: Dict) -> Dict[str, List[str]]:
"""
Suggest field mappings based on field analysis.
Each dropdown will show ALL available fields, but ordered by relevance
with the most likely candidates first.
Args:
field_analysis: Analysis results from OpenSearchClient.analyze_fields
Returns:
Dictionary with suggested fields for each mapping (ordered by relevance)
"""
all_fields = field_analysis.get("all_fields", [])
vector_fields = [vf["name"] for vf in field_analysis.get("vector_fields", [])]
text_fields = field_analysis.get("text_fields", [])
keyword_fields = field_analysis.get("keyword_fields", [])
# Helper function to create ordered suggestions
def create_ordered_suggestions(primary_candidates, all_available_fields):
# Start with primary candidates, then add all other fields
ordered = []
# Add primary candidates first
for field in primary_candidates:
if field in all_available_fields and field not in ordered:
ordered.append(field)
# Add remaining fields
for field in all_available_fields:
if field not in ordered:
ordered.append(field)
return ordered
suggestions = {}
# Embedding field suggestions (vector fields first, then name-based candidates, then all fields)
embedding_candidates = vector_fields.copy()
# Add fields that likely contain embeddings based on name
embedding_name_candidates = [
f
for f in all_fields
if any(
keyword in f.lower()
for keyword in ["embedding", "embeddings", "vector", "vectors", "embed"]
)
]
# Add name-based candidates that aren't already in vector_fields
for candidate in embedding_name_candidates:
if candidate not in embedding_candidates:
embedding_candidates.append(candidate)
suggestions["embedding"] = create_ordered_suggestions(
embedding_candidates, all_fields
)
# Text field suggestions (text fields first, then all fields)
text_candidates = text_fields.copy()
suggestions["text"] = create_ordered_suggestions(text_candidates, all_fields)
# ID field suggestions (ID-like fields first, then all fields)
id_candidates = [
f
for f in keyword_fields
if any(keyword in f.lower() for keyword in ["id", "_id", "doc", "document"])
]
id_candidates.append("_id") # _id is always available
suggestions["id"] = create_ordered_suggestions(id_candidates, all_fields)
# Category field suggestions (category-like fields first, then all fields)
category_candidates = [
f
for f in keyword_fields
if any(
keyword in f.lower()
for keyword in ["category", "class", "type", "label"]
)
]
suggestions["category"] = create_ordered_suggestions(
category_candidates, all_fields
)
# Subcategory field suggestions (subcategory-like fields first, then all fields)
subcategory_candidates = [
f
for f in keyword_fields
if any(
keyword in f.lower()
for keyword in ["subcategory", "subclass", "subtype", "subtopic"]
)
]
suggestions["subcategory"] = create_ordered_suggestions(
subcategory_candidates, all_fields
)
# Tags field suggestions (tag-like fields first, then all fields)
tags_candidates = [
f
for f in keyword_fields
if any(
keyword in f.lower()
for keyword in ["tag", "tags", "keyword", "keywords"]
)
]
suggestions["tags"] = create_ordered_suggestions(tags_candidates, all_fields)
return suggestions
@staticmethod
def validate_mapping(
mapping: FieldMapping, available_fields: List[str]
) -> List[str]:
"""
Validate that the field mapping is correct.
Returns:
List of validation errors (empty if valid)
"""
errors = []
# Required fields validation
if not mapping.embedding_field:
errors.append("Embedding field is required")
elif mapping.embedding_field not in available_fields:
errors.append(
f"Embedding field '{mapping.embedding_field}' not found in index"
)
if not mapping.text_field:
errors.append("Text field is required")
elif mapping.text_field not in available_fields:
errors.append(f"Text field '{mapping.text_field}' not found in index")
# Optional fields validation
optional_fields = {
"id_field": mapping.id_field,
"category_field": mapping.category_field,
"subcategory_field": mapping.subcategory_field,
"tags_field": mapping.tags_field,
}
for field_name, field_value in optional_fields.items():
if field_value and field_value not in available_fields:
errors.append(
f"Field '{field_value}' for {field_name} not found in index"
)
return errors
@staticmethod
def transform_documents(
documents: List[Dict[str, Any]], mapping: FieldMapping
) -> List[Dict[str, Any]]:
"""
Transform OpenSearch documents to standard format using field mapping.
Args:
documents: Raw documents from OpenSearch
mapping: Field mapping configuration
Returns:
List of transformed documents in standard format
"""
transformed = []
for doc in documents:
try:
# Build standard format document
standard_doc = {}
# Required fields
if mapping.embedding_field in doc:
standard_doc["embedding"] = doc[mapping.embedding_field]
else:
logger.warning(
f"Missing embedding field '{mapping.embedding_field}' in document"
)
continue
if mapping.text_field in doc:
standard_doc["text"] = str(doc[mapping.text_field])
else:
logger.warning(
f"Missing text field '{mapping.text_field}' in document"
)
continue
# Optional fields
if mapping.id_field and mapping.id_field in doc:
standard_doc["id"] = str(doc[mapping.id_field])
if mapping.category_field and mapping.category_field in doc:
standard_doc["category"] = str(doc[mapping.category_field])
if mapping.subcategory_field and mapping.subcategory_field in doc:
standard_doc["subcategory"] = str(doc[mapping.subcategory_field])
if mapping.tags_field and mapping.tags_field in doc:
tags = doc[mapping.tags_field]
# Handle both string and list tags
if isinstance(tags, list):
standard_doc["tags"] = [str(tag) for tag in tags]
else:
standard_doc["tags"] = [str(tags)]
transformed.append(standard_doc)
except Exception as e:
logger.error(f"Error transforming document: {e}")
continue
logger.info(f"Transformed {len(transformed)} documents out of {len(documents)}")
return transformed
@staticmethod
def create_mapping_from_dict(mapping_dict: Dict[str, str]) -> FieldMapping:
"""
Create a FieldMapping from a dictionary.
Args:
mapping_dict: Dictionary with field mappings
Returns:
FieldMapping instance
"""
return FieldMapping(
embedding_field=mapping_dict.get("embedding", ""),
text_field=mapping_dict.get("text", ""),
id_field=mapping_dict.get("id") or None,
category_field=mapping_dict.get("category") or None,
subcategory_field=mapping_dict.get("subcategory") or None,
tags_field=mapping_dict.get("tags") or None,
)

View File

@@ -1,38 +1,60 @@
from dash import callback, Input, Output, State from dash import callback, Input, Output, State, no_update
from ...data.processor import DataProcessor from ...data.processor import DataProcessor
from ...data.sources.opensearch import OpenSearchClient
from ...models.field_mapper import FieldMapper
from ...config.settings import AppSettings
class DataProcessingCallbacks: class DataProcessingCallbacks:
def __init__(self): def __init__(self):
self.processor = DataProcessor() self.processor = DataProcessor()
self.opensearch_client_data = OpenSearchClient() # For data/documents
self.opensearch_client_prompts = OpenSearchClient() # For prompts
self._register_callbacks() self._register_callbacks()
def _register_callbacks(self): def _register_callbacks(self):
@callback( @callback(
Output("processed-data", "data"), [
Output("processed-data", "data", allow_duplicate=True),
Output("upload-error-alert", "children", allow_duplicate=True),
Output("upload-error-alert", "is_open", allow_duplicate=True),
],
Input("upload-data", "contents"), Input("upload-data", "contents"),
State("upload-data", "filename"), State("upload-data", "filename"),
prevent_initial_call=True,
) )
def process_uploaded_file(contents, filename): def process_uploaded_file(contents, filename):
if contents is None: if contents is None:
return None return None, "", False
processed_data = self.processor.process_upload(contents, filename) processed_data = self.processor.process_upload(contents, filename)
if processed_data.error: if processed_data.error:
return {"error": processed_data.error} error_message = self._format_error_message(
processed_data.error, filename
)
return (
{"error": processed_data.error},
error_message,
True, # Show error alert
)
return { return (
"documents": [ {
self._document_to_dict(doc) for doc in processed_data.documents "documents": [
], self._document_to_dict(doc) for doc in processed_data.documents
"embeddings": processed_data.embeddings.tolist(), ],
} "embeddings": processed_data.embeddings.tolist(),
},
"",
False, # Hide error alert
)
@callback( @callback(
Output("processed-prompts", "data"), Output("processed-prompts", "data", allow_duplicate=True),
Input("upload-prompts", "contents"), Input("upload-prompts", "contents"),
State("upload-prompts", "filename"), State("upload-prompts", "filename"),
prevent_initial_call=True,
) )
def process_uploaded_prompts(contents, filename): def process_uploaded_prompts(contents, filename):
if contents is None: if contents is None:
@@ -50,6 +72,397 @@ class DataProcessingCallbacks:
"embeddings": processed_data.embeddings.tolist(), "embeddings": processed_data.embeddings.tolist(),
} }
# OpenSearch callbacks
@callback(
[
Output("tab-content", "children"),
],
[Input("data-source-tabs", "active_tab")],
prevent_initial_call=False,
)
def render_tab_content(active_tab):
from ...ui.components.datasource import DataSourceComponent
datasource = DataSourceComponent()
if active_tab == "opensearch-tab":
return [datasource.create_opensearch_tab()]
else:
return [datasource.create_file_upload_tab()]
# Register callbacks for both data and prompts sections
self._register_opensearch_callbacks("data", self.opensearch_client_data)
self._register_opensearch_callbacks("prompts", self.opensearch_client_prompts)
# Register collapsible section callbacks
self._register_collapse_callbacks()
def _register_opensearch_callbacks(self, section_type, opensearch_client):
"""Register callbacks for a specific section (data or prompts)."""
@callback(
Output(f"{section_type}-auth-collapse", "is_open"),
[Input(f"{section_type}-auth-toggle", "n_clicks")],
[State(f"{section_type}-auth-collapse", "is_open")],
prevent_initial_call=True,
)
def toggle_auth(n_clicks, is_open):
if n_clicks:
return not is_open
return is_open
@callback(
Output(f"{section_type}-auth-toggle", "children"),
[Input(f"{section_type}-auth-collapse", "is_open")],
prevent_initial_call=False,
)
def update_auth_button_text(is_open):
return "Hide Authentication" if is_open else "Show Authentication"
@callback(
[
Output(f"{section_type}-connection-status", "children"),
Output(f"{section_type}-field-mapping-section", "children"),
Output(f"{section_type}-field-mapping-section", "style"),
Output(f"{section_type}-load-data-section", "style"),
Output(f"{section_type}-load-opensearch-data-btn", "disabled"),
Output(f"{section_type}-embedding-field-dropdown", "options"),
Output(f"{section_type}-text-field-dropdown", "options"),
Output(f"{section_type}-id-field-dropdown", "options"),
Output(f"{section_type}-category-field-dropdown", "options"),
Output(f"{section_type}-subcategory-field-dropdown", "options"),
Output(f"{section_type}-tags-field-dropdown", "options"),
],
[Input(f"{section_type}-test-connection-btn", "n_clicks")],
[
State(f"{section_type}-opensearch-url", "value"),
State(f"{section_type}-opensearch-index", "value"),
State(f"{section_type}-opensearch-username", "value"),
State(f"{section_type}-opensearch-password", "value"),
State(f"{section_type}-opensearch-api-key", "value"),
],
prevent_initial_call=True,
)
def test_opensearch_connection(
n_clicks, url, index_name, username, password, api_key
):
if not n_clicks or not url or not index_name:
return (
no_update,
no_update,
no_update,
no_update,
no_update,
no_update,
no_update,
no_update,
no_update,
no_update,
no_update,
)
# Test connection
success, message = opensearch_client.connect(
url=url,
username=username,
password=password,
api_key=api_key,
verify_certs=AppSettings.OPENSEARCH_VERIFY_CERTS,
)
if not success:
return (
self._create_status_alert(f"{message}", "danger"),
[],
{"display": "none"},
{"display": "none"},
True,
[], # empty options for hidden dropdowns
[],
[],
[],
[],
[],
)
# Analyze fields
success, field_analysis, analysis_message = (
opensearch_client.analyze_fields(index_name)
)
if not success:
return (
self._create_status_alert(f"{analysis_message}", "danger"),
[],
{"display": "none"},
{"display": "none"},
True,
[], # empty options for hidden dropdowns
[],
[],
[],
[],
[],
)
# Generate field suggestions
field_suggestions = FieldMapper.suggest_mappings(field_analysis)
from ...ui.components.datasource import DataSourceComponent
datasource = DataSourceComponent()
field_mapping_ui = datasource.create_field_mapping_interface(
field_suggestions, section_type
)
return (
self._create_status_alert(f"{message}", "success"),
field_mapping_ui,
{"display": "block"},
{"display": "block"},
False,
[
{"label": field, "value": field}
for field in field_suggestions.get("embedding", [])
],
[
{"label": field, "value": field}
for field in field_suggestions.get("text", [])
],
[
{"label": field, "value": field}
for field in field_suggestions.get("id", [])
],
[
{"label": field, "value": field}
for field in field_suggestions.get("category", [])
],
[
{"label": field, "value": field}
for field in field_suggestions.get("subcategory", [])
],
[
{"label": field, "value": field}
for field in field_suggestions.get("tags", [])
],
)
# Determine output target based on section type
output_target = (
"processed-data" if section_type == "data" else "processed-prompts"
)
@callback(
[
Output(output_target, "data", allow_duplicate=True),
Output("opensearch-success-alert", "children", allow_duplicate=True),
Output("opensearch-success-alert", "is_open", allow_duplicate=True),
Output("opensearch-error-alert", "children", allow_duplicate=True),
Output("opensearch-error-alert", "is_open", allow_duplicate=True),
],
[Input(f"{section_type}-load-opensearch-data-btn", "n_clicks")],
[
State(f"{section_type}-opensearch-index", "value"),
State(f"{section_type}-opensearch-query-size", "value"),
State(f"{section_type}-embedding-field-dropdown-ui", "value"),
State(f"{section_type}-text-field-dropdown-ui", "value"),
State(f"{section_type}-id-field-dropdown-ui", "value"),
State(f"{section_type}-category-field-dropdown-ui", "value"),
State(f"{section_type}-subcategory-field-dropdown-ui", "value"),
State(f"{section_type}-tags-field-dropdown-ui", "value"),
],
prevent_initial_call=True,
)
def load_opensearch_data(
n_clicks,
index_name,
query_size,
embedding_field,
text_field,
id_field,
category_field,
subcategory_field,
tags_field,
):
if not n_clicks or not index_name or not embedding_field or not text_field:
return no_update, no_update, no_update, no_update, no_update
try:
# Validate and set query size
if not query_size or query_size < 1:
query_size = AppSettings.OPENSEARCH_DEFAULT_SIZE
elif query_size > 1000:
query_size = 1000 # Cap at reasonable maximum
# Create field mapping
field_mapping = FieldMapper.create_mapping_from_dict(
{
"embedding": embedding_field,
"text": text_field,
"id": id_field,
"category": category_field,
"subcategory": subcategory_field,
"tags": tags_field,
}
)
# Fetch data from OpenSearch
success, raw_documents, message = opensearch_client.fetch_data(
index_name, size=query_size
)
if not success:
return (
no_update,
"",
False,
f"❌ Failed to fetch {section_type}: {message}",
True,
)
# Process the data
processed_data = self.processor.process_opensearch_data(
raw_documents, field_mapping
)
if processed_data.error:
return (
{"error": processed_data.error},
"",
False,
f"{section_type.title()} processing error: {processed_data.error}",
True,
)
success_message = f"✅ Successfully loaded {len(processed_data.documents)} {section_type} from OpenSearch"
# Format for appropriate target (data vs prompts)
if section_type == "data":
return (
{
"documents": [
self._document_to_dict(doc)
for doc in processed_data.documents
],
"embeddings": processed_data.embeddings.tolist(),
},
success_message,
True,
"",
False,
)
else: # prompts
return (
{
"prompts": [
self._document_to_dict(doc)
for doc in processed_data.documents
],
"embeddings": processed_data.embeddings.tolist(),
},
success_message,
True,
"",
False,
)
except Exception as e:
return (no_update, "", False, f"❌ Unexpected error: {str(e)}", True)
# Sync callbacks to update hidden dropdowns from UI dropdowns
@callback(
Output(f"{section_type}-embedding-field-dropdown", "value"),
Input(f"{section_type}-embedding-field-dropdown-ui", "value"),
prevent_initial_call=True,
)
def sync_embedding_dropdown(value):
return value
@callback(
Output(f"{section_type}-text-field-dropdown", "value"),
Input(f"{section_type}-text-field-dropdown-ui", "value"),
prevent_initial_call=True,
)
def sync_text_dropdown(value):
return value
@callback(
Output(f"{section_type}-id-field-dropdown", "value"),
Input(f"{section_type}-id-field-dropdown-ui", "value"),
prevent_initial_call=True,
)
def sync_id_dropdown(value):
return value
@callback(
Output(f"{section_type}-category-field-dropdown", "value"),
Input(f"{section_type}-category-field-dropdown-ui", "value"),
prevent_initial_call=True,
)
def sync_category_dropdown(value):
return value
@callback(
Output(f"{section_type}-subcategory-field-dropdown", "value"),
Input(f"{section_type}-subcategory-field-dropdown-ui", "value"),
prevent_initial_call=True,
)
def sync_subcategory_dropdown(value):
return value
@callback(
Output(f"{section_type}-tags-field-dropdown", "value"),
Input(f"{section_type}-tags-field-dropdown-ui", "value"),
prevent_initial_call=True,
)
def sync_tags_dropdown(value):
return value
def _register_collapse_callbacks(self):
"""Register callbacks for collapsible sections."""
# Data section collapse callback
@callback(
[
Output("data-collapse", "is_open"),
Output("data-collapse-icon", "className"),
],
[Input("data-collapse-toggle", "n_clicks")],
[State("data-collapse", "is_open")],
prevent_initial_call=True,
)
def toggle_data_collapse(n_clicks, is_open):
if n_clicks:
new_state = not is_open
icon_class = (
"fas fa-chevron-down me-2"
if new_state
else "fas fa-chevron-right me-2"
)
return new_state, icon_class
return is_open, "fas fa-chevron-down me-2"
# Prompts section collapse callback
@callback(
[
Output("prompts-collapse", "is_open"),
Output("prompts-collapse-icon", "className"),
],
[Input("prompts-collapse-toggle", "n_clicks")],
[State("prompts-collapse", "is_open")],
prevent_initial_call=True,
)
def toggle_prompts_collapse(n_clicks, is_open):
if n_clicks:
new_state = not is_open
icon_class = (
"fas fa-chevron-down me-2"
if new_state
else "fas fa-chevron-right me-2"
)
return new_state, icon_class
return is_open, "fas fa-chevron-down me-2"
@staticmethod @staticmethod
def _document_to_dict(doc): def _document_to_dict(doc):
return { return {
@@ -60,3 +473,51 @@ class DataProcessingCallbacks:
"subcategory": doc.subcategory, "subcategory": doc.subcategory,
"tags": doc.tags, "tags": doc.tags,
} }
@staticmethod
def _format_error_message(error: str, filename: str | None = None) -> str:
"""Format error message with helpful guidance for users."""
file_part = f" in file '{filename}'" if filename else ""
# Check for common error patterns and provide helpful messages
if "embedding" in error.lower() and (
"key" in error.lower() or "required field" in error.lower()
):
return (
f"❌ Missing 'embedding' field{file_part}. "
"Each line must contain an 'embedding' field with a list of numbers."
)
elif "text" in error.lower() and (
"key" in error.lower() or "required field" in error.lower()
):
return (
f"❌ Missing 'text' field{file_part}. "
"Each line must contain a 'text' field with the document content."
)
elif "json" in error.lower() and "decode" in error.lower():
return (
f"❌ Invalid JSON format{file_part}. "
"Please check that each line is valid JSON with proper syntax (quotes, braces, etc.)."
)
elif "unicode" in error.lower() or "decode" in error.lower():
return (
f"❌ File encoding issue{file_part}. "
"Please ensure the file is saved in UTF-8 format and contains no binary data."
)
elif "array" in error.lower() or "list" in error.lower():
return (
f"❌ Invalid embedding format{file_part}. "
"Embeddings must be arrays/lists of numbers, not strings or other types."
)
else:
return (
f"❌ Error processing file{file_part}: {error}. "
"Please check that your file is valid NDJSON with required 'text' and 'embedding' fields."
)
@staticmethod
def _create_status_alert(message: str, color: str):
"""Create a status alert component."""
import dash_bootstrap_components as dbc
return dbc.Alert(message, color=color, className="mb-2")

View File

@@ -0,0 +1,519 @@
from dash import dcc, html
import dash_bootstrap_components as dbc
from .upload import UploadComponent
class DataSourceComponent:
def __init__(self):
self.upload_component = UploadComponent()
def create_tabbed_interface(self):
"""Create tabbed interface for different data sources."""
return dbc.Card(
[
dbc.CardHeader(
[
dbc.Tabs(
[
dbc.Tab(label="File Upload", tab_id="file-tab"),
dbc.Tab(label="OpenSearch", tab_id="opensearch-tab"),
],
id="data-source-tabs",
active_tab="file-tab",
)
]
),
dbc.CardBody([html.Div(id="tab-content")]),
]
)
def create_file_upload_tab(self):
"""Create file upload tab content."""
return html.Div(
[
self.upload_component.create_error_alert(),
self.upload_component.create_data_upload(),
self.upload_component.create_prompts_upload(),
self.upload_component.create_reset_button(),
]
)
def create_opensearch_tab(self):
"""Create OpenSearch tab content with separate Data and Prompts sections."""
return html.Div(
[
# Data Section
dbc.Card(
[
dbc.CardHeader(
[
dbc.Button(
[
html.I(
className="fas fa-chevron-down me-2",
id="data-collapse-icon",
),
"📄 Documents/Data",
],
id="data-collapse-toggle",
color="link",
className="text-start p-0 w-100 text-decoration-none",
style={
"border": "none",
"font-size": "1.25rem",
"font-weight": "500",
},
),
]
),
dbc.Collapse(
[dbc.CardBody([self._create_opensearch_section("data")])],
id="data-collapse",
is_open=True,
),
],
className="mb-4",
),
# Prompts Section
dbc.Card(
[
dbc.CardHeader(
[
dbc.Button(
[
html.I(
className="fas fa-chevron-down me-2",
id="prompts-collapse-icon",
),
"💬 Prompts",
],
id="prompts-collapse-toggle",
color="link",
className="text-start p-0 w-100 text-decoration-none",
style={
"border": "none",
"font-size": "1.25rem",
"font-weight": "500",
},
),
]
),
dbc.Collapse(
[
dbc.CardBody(
[self._create_opensearch_section("prompts")]
)
],
id="prompts-collapse",
is_open=True,
),
],
className="mb-4",
),
# Hidden dropdowns to prevent callback errors (for both sections)
html.Div(
[
# Data dropdowns (hidden sync targets)
dcc.Dropdown(
id="data-embedding-field-dropdown",
style={"display": "none"},
),
dcc.Dropdown(
id="data-text-field-dropdown", style={"display": "none"}
),
dcc.Dropdown(
id="data-id-field-dropdown", style={"display": "none"}
),
dcc.Dropdown(
id="data-category-field-dropdown", style={"display": "none"}
),
dcc.Dropdown(
id="data-subcategory-field-dropdown",
style={"display": "none"},
),
dcc.Dropdown(
id="data-tags-field-dropdown", style={"display": "none"}
),
# Data UI dropdowns (hidden placeholders)
dcc.Dropdown(
id="data-embedding-field-dropdown-ui",
style={"display": "none"},
),
dcc.Dropdown(
id="data-text-field-dropdown-ui", style={"display": "none"}
),
dcc.Dropdown(
id="data-id-field-dropdown-ui", style={"display": "none"}
),
dcc.Dropdown(
id="data-category-field-dropdown-ui",
style={"display": "none"},
),
dcc.Dropdown(
id="data-subcategory-field-dropdown-ui",
style={"display": "none"},
),
dcc.Dropdown(
id="data-tags-field-dropdown-ui", style={"display": "none"}
),
# Prompts dropdowns (hidden sync targets)
dcc.Dropdown(
id="prompts-embedding-field-dropdown",
style={"display": "none"},
),
dcc.Dropdown(
id="prompts-text-field-dropdown", style={"display": "none"}
),
dcc.Dropdown(
id="prompts-id-field-dropdown", style={"display": "none"}
),
dcc.Dropdown(
id="prompts-category-field-dropdown",
style={"display": "none"},
),
dcc.Dropdown(
id="prompts-subcategory-field-dropdown",
style={"display": "none"},
),
dcc.Dropdown(
id="prompts-tags-field-dropdown", style={"display": "none"}
),
# Prompts UI dropdowns (hidden placeholders)
dcc.Dropdown(
id="prompts-embedding-field-dropdown-ui",
style={"display": "none"},
),
dcc.Dropdown(
id="prompts-text-field-dropdown-ui",
style={"display": "none"},
),
dcc.Dropdown(
id="prompts-id-field-dropdown-ui", style={"display": "none"}
),
dcc.Dropdown(
id="prompts-category-field-dropdown-ui",
style={"display": "none"},
),
dcc.Dropdown(
id="prompts-subcategory-field-dropdown-ui",
style={"display": "none"},
),
dcc.Dropdown(
id="prompts-tags-field-dropdown-ui",
style={"display": "none"},
),
],
style={"display": "none"},
),
]
)
def _create_opensearch_section(self, section_type):
"""Create a complete OpenSearch section for either 'data' or 'prompts'."""
section_id = section_type # 'data' or 'prompts'
return html.Div(
[
# Connection section
html.H6("Connection", className="mb-2"),
dbc.Row(
[
dbc.Col(
[
dbc.Label("OpenSearch URL:"),
dbc.Input(
id=f"{section_id}-opensearch-url",
type="text",
placeholder="https://opensearch.example.com:9200",
className="mb-2",
),
],
width=12,
),
]
),
dbc.Row(
[
dbc.Col(
[
dbc.Label("Index Name:"),
dbc.Input(
id=f"{section_id}-opensearch-index",
type="text",
placeholder="my-embeddings-index",
className="mb-2",
),
],
width=6,
),
dbc.Col(
[
dbc.Label("Query Size:"),
dbc.Input(
id=f"{section_id}-opensearch-query-size",
type="number",
value=100,
min=1,
max=1000,
placeholder="100",
className="mb-2",
),
],
width=6,
),
]
),
dbc.Row(
[
dbc.Col(
[
dbc.Button(
"Test Connection",
id=f"{section_id}-test-connection-btn",
color="primary",
className="mb-3",
),
],
width=12,
),
]
),
# Authentication section (collapsible)
dbc.Collapse(
[
html.Hr(),
html.H6("Authentication (Optional)", className="mb-2"),
dbc.Row(
[
dbc.Col(
[
dbc.Label("Username:"),
dbc.Input(
id=f"{section_id}-opensearch-username",
type="text",
className="mb-2",
),
],
width=6,
),
dbc.Col(
[
dbc.Label("Password:"),
dbc.Input(
id=f"{section_id}-opensearch-password",
type="password",
className="mb-2",
),
],
width=6,
),
]
),
dbc.Label("OR"),
dbc.Input(
id=f"{section_id}-opensearch-api-key",
type="text",
placeholder="API Key",
className="mb-2",
),
],
id=f"{section_id}-auth-collapse",
is_open=False,
),
dbc.Button(
"Show Authentication",
id=f"{section_id}-auth-toggle",
color="link",
size="sm",
className="p-0 mb-3",
),
# Connection status
html.Div(id=f"{section_id}-connection-status", className="mb-3"),
# Field mapping section (hidden initially)
html.Div(
id=f"{section_id}-field-mapping-section", style={"display": "none"}
),
# Load data button (hidden initially)
html.Div(
[
dbc.Button(
f"Load {section_type.title()}",
id=f"{section_id}-load-opensearch-data-btn",
color="success",
className="mb-2",
disabled=True,
),
],
id=f"{section_id}-load-data-section",
style={"display": "none"},
),
# OpenSearch status/results
html.Div(id=f"{section_id}-opensearch-status", className="mb-3"),
]
)
def create_field_mapping_interface(self, field_suggestions, section_type="data"):
"""Create field mapping interface based on detected fields."""
return html.Div(
[
html.Hr(),
html.H6("Field Mapping", className="mb-2"),
html.P(
"Map your OpenSearch fields to the required format:",
className="text-muted small",
),
# Required fields
dbc.Row(
[
dbc.Col(
[
dbc.Label(
"Embedding Field (required):", className="fw-bold"
),
dcc.Dropdown(
id=f"{section_type}-embedding-field-dropdown-ui",
options=[
{"label": field, "value": field}
for field in field_suggestions.get(
"embedding", []
)
],
value=field_suggestions.get("embedding", [None])[
0
], # Default to first suggestion
placeholder="Select embedding field...",
className="mb-2",
),
],
width=6,
),
dbc.Col(
[
dbc.Label(
"Text Field (required):", className="fw-bold"
),
dcc.Dropdown(
id=f"{section_type}-text-field-dropdown-ui",
options=[
{"label": field, "value": field}
for field in field_suggestions.get("text", [])
],
value=field_suggestions.get("text", [None])[
0
], # Default to first suggestion
placeholder="Select text field...",
className="mb-2",
),
],
width=6,
),
]
),
# Optional fields
html.H6("Optional Fields", className="mb-2 mt-3"),
dbc.Row(
[
dbc.Col(
[
dbc.Label("ID Field:"),
dcc.Dropdown(
id=f"{section_type}-id-field-dropdown-ui",
options=[
{"label": field, "value": field}
for field in field_suggestions.get("id", [])
],
value=field_suggestions.get("id", [None])[
0
], # Default to first suggestion
placeholder="Select ID field...",
className="mb-2",
),
],
width=6,
),
dbc.Col(
[
dbc.Label("Category Field:"),
dcc.Dropdown(
id=f"{section_type}-category-field-dropdown-ui",
options=[
{"label": field, "value": field}
for field in field_suggestions.get(
"category", []
)
],
value=field_suggestions.get("category", [None])[
0
], # Default to first suggestion
placeholder="Select category field...",
className="mb-2",
),
],
width=6,
),
]
),
dbc.Row(
[
dbc.Col(
[
dbc.Label("Subcategory Field:"),
dcc.Dropdown(
id=f"{section_type}-subcategory-field-dropdown-ui",
options=[
{"label": field, "value": field}
for field in field_suggestions.get(
"subcategory", []
)
],
value=field_suggestions.get("subcategory", [None])[
0
], # Default to first suggestion
placeholder="Select subcategory field...",
className="mb-2",
),
],
width=6,
),
dbc.Col(
[
dbc.Label("Tags Field:"),
dcc.Dropdown(
id=f"{section_type}-tags-field-dropdown-ui",
options=[
{"label": field, "value": field}
for field in field_suggestions.get("tags", [])
],
value=field_suggestions.get("tags", [None])[
0
], # Default to first suggestion
placeholder="Select tags field...",
className="mb-2",
),
],
width=6,
),
]
),
]
)
def create_error_alert(self):
"""Create error alert component for OpenSearch issues."""
return dbc.Alert(
id="opensearch-error-alert",
dismissable=True,
is_open=False,
color="danger",
className="mb-3",
)
def create_success_alert(self):
"""Create success alert component for OpenSearch operations."""
return dbc.Alert(
id="opensearch-success-alert",
dismissable=True,
is_open=False,
color="success",
className="mb-3",
)

View File

@@ -1,24 +1,28 @@
from dash import dcc, html from dash import dcc, html
import dash_bootstrap_components as dbc import dash_bootstrap_components as dbc
from .upload import UploadComponent from .upload import UploadComponent
from .datasource import DataSourceComponent
class SidebarComponent: class SidebarComponent:
def __init__(self): def __init__(self):
self.upload_component = UploadComponent() self.upload_component = UploadComponent()
self.datasource_component = DataSourceComponent()
def create_layout(self): def create_layout(self):
return dbc.Col( return dbc.Col(
[ [
html.H5("Upload Data", className="mb-3"), html.H5("Data Sources", className="mb-3"),
self.upload_component.create_data_upload(), self.datasource_component.create_error_alert(),
self.upload_component.create_prompts_upload(), self.datasource_component.create_success_alert(),
self.upload_component.create_reset_button(), self.datasource_component.create_tabbed_interface(),
html.H5("Visualization Controls", className="mb-3"), html.H5("Visualization Controls", className="mb-3 mt-4"),
self._create_method_dropdown(), ]
self._create_color_dropdown(), + self._create_method_dropdown()
self._create_dimension_toggle(), + self._create_color_dropdown()
self._create_prompts_toggle(), + self._create_dimension_toggle()
+ self._create_prompts_toggle()
+ [
html.H5("Point Details", className="mb-3"), html.H5("Point Details", className="mb-3"),
html.Div( html.Div(
id="point-details", children="Click on a point to see details" id="point-details", children="Click on a point to see details"

View File

@@ -51,3 +51,14 @@ class UploadComponent:
className="mb-3", className="mb-3",
style={"width": "100%"}, style={"width": "100%"},
) )
@staticmethod
def create_error_alert():
"""Create error alert component for data upload issues."""
return dbc.Alert(
id="upload-error-alert",
dismissable=True,
is_open=False,
color="danger",
className="mb-3",
)

View File

@@ -9,7 +9,8 @@ class AppLayout:
def create_layout(self): def create_layout(self):
return dbc.Container( return dbc.Container(
[self._create_header(), self._create_main_content(), self._create_stores()], [self._create_header(), self._create_main_content()]
+ self._create_stores(),
fluid=True, fluid=True,
) )

197
tests/test_bad_data.py Normal file
View File

@@ -0,0 +1,197 @@
"""Tests for handling bad/invalid data files."""
import pytest
import json
import base64
from src.embeddingbuddy.data.parser import NDJSONParser
from src.embeddingbuddy.data.processor import DataProcessor
class TestBadDataHandling:
"""Test suite for various types of invalid input data."""
def setup_method(self):
"""Set up test fixtures."""
self.parser = NDJSONParser()
self.processor = DataProcessor()
def _create_upload_contents(self, text_content: str) -> str:
"""Helper to create upload contents format."""
encoded = base64.b64encode(text_content.encode("utf-8")).decode("utf-8")
return f"data:application/json;base64,{encoded}"
def test_missing_embedding_field(self):
"""Test files missing required embedding field."""
bad_content = '{"id": "doc_001", "text": "Sample text", "category": "test"}'
with pytest.raises(KeyError, match="embedding"):
self.parser.parse_text(bad_content)
# Test processor error handling
upload_contents = self._create_upload_contents(bad_content)
result = self.processor.process_upload(upload_contents)
assert result.error is not None
assert "embedding" in result.error
def test_missing_text_field(self):
"""Test files missing required text field."""
bad_content = (
'{"id": "doc_001", "embedding": [0.1, 0.2, 0.3], "category": "test"}'
)
with pytest.raises(KeyError, match="text"):
self.parser.parse_text(bad_content)
# Test processor error handling
upload_contents = self._create_upload_contents(bad_content)
result = self.processor.process_upload(upload_contents)
assert result.error is not None
assert "text" in result.error
def test_malformed_json_lines(self):
"""Test files with malformed JSON syntax."""
# Missing closing brace
bad_content = '{"id": "doc_001", "embedding": [0.1, 0.2], "text": "test"'
with pytest.raises(json.JSONDecodeError):
self.parser.parse_text(bad_content)
# Test processor error handling
upload_contents = self._create_upload_contents(bad_content)
result = self.processor.process_upload(upload_contents)
assert result.error is not None
def test_invalid_embedding_types(self):
"""Test files with invalid embedding data types."""
test_cases = [
# String instead of array
'{"id": "doc_001", "embedding": "not_an_array", "text": "test"}',
# Mixed types in array
'{"id": "doc_002", "embedding": [0.1, "text", 0.3], "text": "test"}',
# Empty array
'{"id": "doc_003", "embedding": [], "text": "test"}',
# Null embedding
'{"id": "doc_004", "embedding": null, "text": "test"}',
]
for bad_content in test_cases:
upload_contents = self._create_upload_contents(bad_content)
result = self.processor.process_upload(upload_contents)
assert result.error is not None, f"Should fail for: {bad_content}"
def test_inconsistent_embedding_dimensions(self):
"""Test files with embeddings of different dimensions."""
bad_content = """{"id": "doc_001", "embedding": [0.1, 0.2, 0.3, 0.4], "text": "4D embedding"}
{"id": "doc_002", "embedding": [0.1, 0.2, 0.3], "text": "3D embedding"}"""
upload_contents = self._create_upload_contents(bad_content)
result = self.processor.process_upload(upload_contents)
# This might succeed parsing but fail in processing
# The error depends on where dimension validation occurs
if result.error is None:
# If parsing succeeds, check that embeddings have inconsistent shapes
assert len(result.documents) == 2
assert len(result.documents[0].embedding) != len(
result.documents[1].embedding
)
def test_empty_lines_in_ndjson(self):
"""Test files with empty lines mixed in."""
content_with_empty_lines = """{"id": "doc_001", "embedding": [0.1, 0.2], "text": "First line"}
{"id": "doc_002", "embedding": [0.3, 0.4], "text": "After empty line"}"""
# This should work - empty lines should be skipped
documents = self.parser.parse_text(content_with_empty_lines)
assert len(documents) == 2
assert documents[0].id == "doc_001"
assert documents[1].id == "doc_002"
def test_not_ndjson_format(self):
"""Test regular JSON array instead of NDJSON."""
json_array = """[
{"id": "doc_001", "embedding": [0.1, 0.2], "text": "First"},
{"id": "doc_002", "embedding": [0.3, 0.4], "text": "Second"}
]"""
with pytest.raises(json.JSONDecodeError):
self.parser.parse_text(json_array)
def test_binary_content_in_file(self):
"""Test files with binary content mixed in."""
# Simulate binary content that can't be decoded
binary_content = (
b'\x00\x01\x02{"id": "doc_001", "embedding": [0.1], "text": "test"}'
)
# This should result in an error when processing
encoded = base64.b64encode(binary_content).decode("utf-8")
upload_contents = f"data:application/json;base64,{encoded}"
result = self.processor.process_upload(upload_contents)
# Should either fail with UnicodeDecodeError or JSON parsing error
assert result.error is not None
def test_extremely_large_embeddings(self):
"""Test embeddings with very large dimensions."""
large_embedding = [0.1] * 10000 # 10k dimensions
content = json.dumps(
{
"id": "doc_001",
"embedding": large_embedding,
"text": "Large embedding test",
}
)
# This should work but might be slow
upload_contents = self._create_upload_contents(content)
result = self.processor.process_upload(upload_contents)
if result.error is None:
assert len(result.documents) == 1
assert len(result.documents[0].embedding) == 10000
def test_special_characters_in_text(self):
"""Test handling of special characters and unicode."""
special_content = json.dumps(
{
"id": "doc_001",
"embedding": [0.1, 0.2],
"text": 'Special chars: 🚀 ñoñó 中文 \n\t"',
},
ensure_ascii=False,
)
upload_contents = self._create_upload_contents(special_content)
result = self.processor.process_upload(upload_contents)
assert result.error is None
assert len(result.documents) == 1
assert "🚀" in result.documents[0].text
def test_processor_error_structure(self):
"""Test that processor returns proper error structure."""
bad_content = '{"invalid": "json"' # Missing closing brace
upload_contents = self._create_upload_contents(bad_content)
result = self.processor.process_upload(upload_contents)
# Check error structure
assert result.error is not None
assert isinstance(result.error, str)
assert len(result.documents) == 0
assert result.embeddings.size == 0
def test_multiple_errors_in_file(self):
"""Test file with multiple different types of errors."""
multi_error_content = """{"id": "doc_001", "text": "Missing embedding"}
{"id": "doc_002", "embedding": "wrong_type", "text": "Wrong embedding type"}
{"id": "doc_003", "embedding": [0.1, 0.2], "text": "Valid line"}
{"id": "doc_004", "embedding": [0.3, 0.4]""" # Missing text and closing brace
upload_contents = self._create_upload_contents(multi_error_content)
result = self.processor.process_upload(upload_contents)
# Should fail on first error encountered
assert result.error is not None

View File

@@ -0,0 +1,155 @@
from unittest.mock import patch
from src.embeddingbuddy.data.processor import DataProcessor
from src.embeddingbuddy.models.field_mapper import FieldMapping
class TestDataProcessorOpenSearch:
def test_process_opensearch_data_success(self):
processor = DataProcessor()
# Mock raw OpenSearch documents
raw_documents = [
{
"vector": [0.1, 0.2, 0.3],
"content": "Test document 1",
"doc_id": "doc1",
"type": "news",
},
{
"vector": [0.4, 0.5, 0.6],
"content": "Test document 2",
"doc_id": "doc2",
"type": "blog",
},
]
# Create field mapping
field_mapping = FieldMapping(
embedding_field="vector",
text_field="content",
id_field="doc_id",
category_field="type",
)
# Process the data
processed_data = processor.process_opensearch_data(raw_documents, field_mapping)
# Assertions
assert processed_data.error is None
assert len(processed_data.documents) == 2
assert processed_data.embeddings.shape == (2, 3)
# Check first document
doc1 = processed_data.documents[0]
assert doc1.text == "Test document 1"
assert doc1.embedding == [0.1, 0.2, 0.3]
assert doc1.id == "doc1"
assert doc1.category == "news"
# Check second document
doc2 = processed_data.documents[1]
assert doc2.text == "Test document 2"
assert doc2.embedding == [0.4, 0.5, 0.6]
assert doc2.id == "doc2"
assert doc2.category == "blog"
def test_process_opensearch_data_with_tags(self):
processor = DataProcessor()
# Mock raw OpenSearch documents with tags
raw_documents = [
{
"vector": [0.1, 0.2, 0.3],
"content": "Test document with tags",
"keywords": ["tag1", "tag2"],
}
]
# Create field mapping
field_mapping = FieldMapping(
embedding_field="vector", text_field="content", tags_field="keywords"
)
processed_data = processor.process_opensearch_data(raw_documents, field_mapping)
assert processed_data.error is None
assert len(processed_data.documents) == 1
doc = processed_data.documents[0]
assert doc.tags == ["tag1", "tag2"]
def test_process_opensearch_data_invalid_documents(self):
processor = DataProcessor()
# Mock raw documents with missing required fields
raw_documents = [
{
"vector": [0.1, 0.2, 0.3],
# Missing text field
}
]
field_mapping = FieldMapping(embedding_field="vector", text_field="content")
processed_data = processor.process_opensearch_data(raw_documents, field_mapping)
# Should return error since no valid documents
assert processed_data.error is not None
assert "No valid documents" in processed_data.error
assert len(processed_data.documents) == 0
def test_process_opensearch_data_partial_success(self):
processor = DataProcessor()
# Mix of valid and invalid documents
raw_documents = [
{
"vector": [0.1, 0.2, 0.3],
"content": "Valid document",
},
{
"vector": [0.4, 0.5, 0.6],
# Missing content field - should be skipped
},
{
"vector": [0.7, 0.8, 0.9],
"content": "Another valid document",
},
]
field_mapping = FieldMapping(embedding_field="vector", text_field="content")
processed_data = processor.process_opensearch_data(raw_documents, field_mapping)
# Should process valid documents only
assert processed_data.error is None
assert len(processed_data.documents) == 2
assert processed_data.documents[0].text == "Valid document"
assert processed_data.documents[1].text == "Another valid document"
@patch("src.embeddingbuddy.models.field_mapper.FieldMapper.transform_documents")
def test_process_opensearch_data_transformation_error(self, mock_transform):
processor = DataProcessor()
# Mock transformation error
mock_transform.side_effect = Exception("Transformation failed")
raw_documents = [{"vector": [0.1], "content": "test"}]
field_mapping = FieldMapping(embedding_field="vector", text_field="content")
processed_data = processor.process_opensearch_data(raw_documents, field_mapping)
assert processed_data.error is not None
assert "Transformation failed" in processed_data.error
assert len(processed_data.documents) == 0
def test_process_opensearch_data_empty_input(self):
processor = DataProcessor()
raw_documents = []
field_mapping = FieldMapping(embedding_field="vector", text_field="content")
processed_data = processor.process_opensearch_data(raw_documents, field_mapping)
assert processed_data.error is not None
assert "No valid documents" in processed_data.error
assert len(processed_data.documents) == 0

310
tests/test_opensearch.py Normal file
View File

@@ -0,0 +1,310 @@
from unittest.mock import Mock, patch
from src.embeddingbuddy.data.sources.opensearch import OpenSearchClient
from src.embeddingbuddy.models.field_mapper import FieldMapper, FieldMapping
class TestOpenSearchClient:
def test_init(self):
client = OpenSearchClient()
assert client.client is None
assert client.connection_info is None
@patch("src.embeddingbuddy.data.sources.opensearch.OpenSearch")
def test_connect_success(self, mock_opensearch):
# Mock the OpenSearch client
mock_client_instance = Mock()
mock_client_instance.info.return_value = {
"cluster_name": "test-cluster",
"version": {"number": "2.0.0"},
}
mock_opensearch.return_value = mock_client_instance
client = OpenSearchClient()
success, message = client.connect("https://localhost:9200")
assert success is True
assert "test-cluster" in message
assert client.client is not None
assert client.connection_info["cluster_name"] == "test-cluster"
@patch("src.embeddingbuddy.data.sources.opensearch.OpenSearch")
def test_connect_failure(self, mock_opensearch):
# Mock connection failure
mock_opensearch.side_effect = Exception("Connection failed")
client = OpenSearchClient()
success, message = client.connect("https://localhost:9200")
assert success is False
assert "Connection failed" in message
assert client.client is None
def test_analyze_fields(self):
client = OpenSearchClient()
client.client = Mock()
# Mock mapping response
mock_mapping = {
"test-index": {
"mappings": {
"properties": {
"embedding": {"type": "dense_vector", "dimension": 768},
"text": {"type": "text"},
"category": {"type": "keyword"},
"id": {"type": "keyword"},
"count": {"type": "integer"},
}
}
}
}
client.client.indices.get_mapping.return_value = mock_mapping
success, analysis, message = client.analyze_fields("test-index")
assert success is True
assert len(analysis["vector_fields"]) == 1
assert analysis["vector_fields"][0]["name"] == "embedding"
assert analysis["vector_fields"][0]["dimension"] == 768
assert "text" in analysis["text_fields"]
assert "category" in analysis["keyword_fields"]
assert "count" in analysis["numeric_fields"]
def test_fetch_sample_data(self):
client = OpenSearchClient()
client.client = Mock()
# Mock search response
mock_response = {
"hits": {
"hits": [
{"_source": {"text": "doc1", "embedding": [0.1, 0.2]}},
{"_source": {"text": "doc2", "embedding": [0.3, 0.4]}},
]
}
}
client.client.search.return_value = mock_response
success, documents, message = client.fetch_sample_data("test-index", size=2)
assert success is True
assert len(documents) == 2
assert documents[0]["text"] == "doc1"
assert documents[1]["text"] == "doc2"
class TestFieldMapper:
def test_suggest_mappings(self):
field_analysis = {
"vector_fields": [{"name": "embedding", "dimension": 768}],
"text_fields": ["content", "description"],
"keyword_fields": ["doc_id", "category", "type", "tags"],
"numeric_fields": ["count"],
"all_fields": [
"embedding",
"content",
"description",
"doc_id",
"category",
"type",
"tags",
"count",
],
}
suggestions = FieldMapper.suggest_mappings(field_analysis)
# Check that all dropdowns contain all fields
all_fields = [
"embedding",
"content",
"description",
"doc_id",
"category",
"type",
"tags",
"count",
]
for field_type in [
"embedding",
"text",
"id",
"category",
"subcategory",
"tags",
]:
for field in all_fields:
assert field in suggestions[field_type], (
f"Field '{field}' missing from {field_type} suggestions"
)
# Check that best candidates are first
assert (
suggestions["embedding"][0] == "embedding"
) # vector field should be first
assert suggestions["text"][0] in [
"content",
"description",
] # text fields should be first
assert suggestions["id"][0] == "doc_id" # ID-like field should be first
assert suggestions["category"][0] in [
"category",
"type",
] # category-like field should be first
assert suggestions["tags"][0] == "tags" # tags field should be first
def test_suggest_mappings_name_based_embedding(self):
"""Test that fields named 'embedding' are prioritized even without vector type."""
field_analysis = {
"vector_fields": [], # No explicit vector fields detected
"text_fields": ["content", "description"],
"keyword_fields": ["doc_id", "category", "type", "tags"],
"numeric_fields": ["count"],
"all_fields": [
"content",
"description",
"doc_id",
"category",
"embedding",
"type",
"tags",
"count",
],
}
suggestions = FieldMapper.suggest_mappings(field_analysis)
# Check that 'embedding' field is prioritized despite not being detected as vector type
assert suggestions["embedding"][0] == "embedding", (
"Field named 'embedding' should be first priority"
)
# Check that all fields are still available
all_fields = [
"content",
"description",
"doc_id",
"category",
"embedding",
"type",
"tags",
"count",
]
for field_type in [
"embedding",
"text",
"id",
"category",
"subcategory",
"tags",
]:
for field in all_fields:
assert field in suggestions[field_type], (
f"Field '{field}' missing from {field_type} suggestions"
)
def test_validate_mapping_success(self):
mapping = FieldMapping(
embedding_field="embedding", text_field="text", id_field="doc_id"
)
available_fields = ["embedding", "text", "doc_id", "category"]
errors = FieldMapper.validate_mapping(mapping, available_fields)
assert len(errors) == 0
def test_validate_mapping_missing_required(self):
mapping = FieldMapping(embedding_field="missing_field", text_field="text")
available_fields = ["text", "category"]
errors = FieldMapper.validate_mapping(mapping, available_fields)
assert len(errors) == 1
assert "missing_field" in errors[0]
assert "not found" in errors[0]
def test_validate_mapping_missing_optional(self):
mapping = FieldMapping(
embedding_field="embedding",
text_field="text",
category_field="missing_category",
)
available_fields = ["embedding", "text"]
errors = FieldMapper.validate_mapping(mapping, available_fields)
assert len(errors) == 1
assert "missing_category" in errors[0]
def test_transform_documents(self):
mapping = FieldMapping(
embedding_field="vector",
text_field="content",
id_field="doc_id",
category_field="type",
)
raw_documents = [
{
"vector": [0.1, 0.2, 0.3],
"content": "Test document 1",
"doc_id": "doc1",
"type": "news",
},
{
"vector": [0.4, 0.5, 0.6],
"content": "Test document 2",
"doc_id": "doc2",
"type": "blog",
},
]
transformed = FieldMapper.transform_documents(raw_documents, mapping)
assert len(transformed) == 2
assert transformed[0]["embedding"] == [0.1, 0.2, 0.3]
assert transformed[0]["text"] == "Test document 1"
assert transformed[0]["id"] == "doc1"
assert transformed[0]["category"] == "news"
def test_transform_documents_missing_required(self):
mapping = FieldMapping(embedding_field="vector", text_field="content")
raw_documents = [
{
"vector": [0.1, 0.2, 0.3],
# Missing content field
}
]
transformed = FieldMapper.transform_documents(raw_documents, mapping)
assert len(transformed) == 0 # Document should be skipped
def test_create_mapping_from_dict(self):
mapping_dict = {
"embedding": "vector_field",
"text": "text_field",
"id": "doc_id",
"category": "cat_field",
"subcategory": "subcat_field",
"tags": "tags_field",
}
mapping = FieldMapper.create_mapping_from_dict(mapping_dict)
assert mapping.embedding_field == "vector_field"
assert mapping.text_field == "text_field"
assert mapping.id_field == "doc_id"
assert mapping.category_field == "cat_field"
assert mapping.subcategory_field == "subcat_field"
assert mapping.tags_field == "tags_field"
def test_create_mapping_from_dict_minimal(self):
mapping_dict = {"embedding": "vector_field", "text": "text_field"}
mapping = FieldMapper.create_mapping_from_dict(mapping_dict)
assert mapping.embedding_field == "vector_field"
assert mapping.text_field == "text_field"
assert mapping.id_field is None
assert mapping.category_field is None

28
uv.lock generated
View File

@@ -412,7 +412,7 @@ wheels = [
[[package]] [[package]]
name = "embeddingbuddy" name = "embeddingbuddy"
version = "0.2.0" version = "0.3.0"
source = { editable = "." } source = { editable = "." }
dependencies = [ dependencies = [
{ name = "dash" }, { name = "dash" },
@@ -420,6 +420,7 @@ dependencies = [
{ name = "mypy" }, { name = "mypy" },
{ name = "numba" }, { name = "numba" },
{ name = "numpy" }, { name = "numpy" },
{ name = "opensearch-py" },
{ name = "opentsne" }, { name = "opentsne" },
{ name = "pandas" }, { name = "pandas" },
{ name = "plotly" }, { name = "plotly" },
@@ -471,6 +472,7 @@ requires-dist = [
{ name = "mypy", marker = "extra == 'lint'", specifier = ">=1.5.0" }, { name = "mypy", marker = "extra == 'lint'", specifier = ">=1.5.0" },
{ name = "numba", specifier = ">=0.56.4" }, { name = "numba", specifier = ">=0.56.4" },
{ name = "numpy", specifier = ">=1.24.4" }, { name = "numpy", specifier = ">=1.24.4" },
{ name = "opensearch-py", specifier = ">=3.0.0" },
{ name = "opentsne", specifier = ">=1.0.0" }, { name = "opentsne", specifier = ">=1.0.0" },
{ name = "pandas", specifier = ">=2.1.4" }, { name = "pandas", specifier = ">=2.1.4" },
{ name = "pip-audit", marker = "extra == 'security'", specifier = ">=2.6.0" }, { name = "pip-audit", marker = "extra == 'security'", specifier = ">=2.6.0" },
@@ -484,6 +486,14 @@ requires-dist = [
] ]
provides-extras = ["test", "lint", "security", "dev", "all"] provides-extras = ["test", "lint", "security", "dev", "all"]
[[package]]
name = "events"
version = "0.5"
source = { registry = "https://pypi.org/simple" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/25/ed/e47dec0626edd468c84c04d97769e7ab4ea6457b7f54dcb3f72b17fcd876/Events-0.5-py3-none-any.whl", hash = "sha256:a7286af378ba3e46640ac9825156c93bdba7502174dd696090fdfcd4d80a1abd", size = 6758, upload-time = "2023-07-31T08:23:13.645Z" },
]
[[package]] [[package]]
name = "filelock" name = "filelock"
version = "3.16.1" version = "3.16.1"
@@ -913,6 +923,22 @@ wheels = [
{ url = "https://files.pythonhosted.org/packages/67/0e/35082d13c09c02c011cf21570543d202ad929d961c02a147493cb0c2bdf5/numpy-2.2.6-cp313-cp313t-win_amd64.whl", hash = "sha256:6031dd6dfecc0cf9f668681a37648373bddd6421fff6c66ec1624eed0180ee06", size = 12771374, upload-time = "2025-05-17T21:43:35.479Z" }, { url = "https://files.pythonhosted.org/packages/67/0e/35082d13c09c02c011cf21570543d202ad929d961c02a147493cb0c2bdf5/numpy-2.2.6-cp313-cp313t-win_amd64.whl", hash = "sha256:6031dd6dfecc0cf9f668681a37648373bddd6421fff6c66ec1624eed0180ee06", size = 12771374, upload-time = "2025-05-17T21:43:35.479Z" },
] ]
[[package]]
name = "opensearch-py"
version = "3.0.0"
source = { registry = "https://pypi.org/simple" }
dependencies = [
{ name = "certifi" },
{ name = "events" },
{ name = "python-dateutil" },
{ name = "requests" },
{ name = "urllib3" },
]
sdist = { url = "https://files.pythonhosted.org/packages/b8/58/ecec7f855aae7bcfb08f570088c6cb993f68c361a0727abab35dbf021acb/opensearch_py-3.0.0.tar.gz", hash = "sha256:ebb38f303f8a3f794db816196315bcddad880be0dc75094e3334bc271db2ed39", size = 248890, upload-time = "2025-06-17T05:39:48.453Z" }
wheels = [
{ url = "https://files.pythonhosted.org/packages/71/e0/69fd114c607b0323d3f864ab4a5ecb87d76ec5a172d2e36a739c8baebea1/opensearch_py-3.0.0-py3-none-any.whl", hash = "sha256:842bf5d56a4a0d8290eda9bb921c50f3080e5dc4e5fefb9c9648289da3f6a8bb", size = 371491, upload-time = "2025-06-17T05:39:46.539Z" },
]
[[package]] [[package]]
name = "opentsne" name = "opentsne"
version = "1.0.2" version = "1.0.2"