Phase 3: Embedding & Vector Pipeline - Implementation Guide¶
Overview¶
This guide walks you through implementing the embedding pipeline that converts imported data into searchable vector embeddings.
What We're Building¶
Flow: GCS Import → Chunking → Embedding Generation → Vector Storage → Search
┌─────────────┐ ┌──────────────┐ ┌───────────────┐ ┌──────────────┐
│ GCS Bucket │ ───▶ │ Chunking │ ───▶ │ Vertex AI │ ───▶ │ Vector DB │
│ (Imports) │ │ (Text Split)│ │ (Embeddings) │ │ (Search Index│
└─────────────┘ └──────────────┘ └───────────────┘ └──────────────┘
Step 1: Install Dependencies¶
What this does: Installs Vertex AI SDK for generating embeddings.
Step 2: Test the Embedding Processor Locally¶
What happens:
1. Reads the email JSON from GCS
2. Extracts text content
3. Splits into 1000-character chunks (with 200-char overlap)
4. Sends chunks to Vertex AI's text-embedding-004 model
5. Gets back 768-dimensional vectors
6. Stores embeddings in GCS at _embeddings/ folder
Expected output:
{
"blob_name": "importer-gmail/20260131_221352_0000.json",
"chunks": 2,
"embeddings_count": 2,
"embeddings_path": "gs://myindependent-ai-data/_embeddings/...",
"success": true
}
Step 3: Process All Imported Emails¶
Create a batch processor script:
# Create batch processing script
cat > scripts/process_all_embeddings.py << 'EOF'
from google.cloud import storage
from embedding.processor import EmbeddingProcessor
# Initialize
processor = EmbeddingProcessor()
client = storage.Client(project='myindependent-ai')
bucket = client.bucket('myindependent-ai-data')
# Get all Gmail imports
blobs = list(bucket.list_blobs(prefix='importer-gmail/', delimiter='/'))
json_files = [b.name for b in blobs if b.name.endswith('.json') and '.gitkeep' not in b.name]
print(f"Found {len(json_files)} files to process")
# Process in batch
result = processor.process_batch(json_files)
print(f"\\nResults:")
print(f" Total: {result['total']}")
print(f" Successful: {result['successful']}")
print(f" Failed: {result['failed']}")
EOF
# Run it
python scripts/process_all_embeddings.py
What this does: Processes all 100 Gmail imports, generating ~200-300 embedding vectors total.
Time: ~2-3 minutes for 100 emails
Step 4: Create Vertex AI Vector Search Index¶
Now we'll create a searchable index from the embeddings:
# Create index creation script
cat > scripts/create_vector_index.py << 'EOF'
from google.cloud import aiplatform
from google.cloud import storage
import json
PROJECT_ID = 'myindependent-ai'
REGION = 'europe-west3'
DISPLAY_NAME = 'myindependent-ai-index'
# Initialize
aiplatform.init(project=PROJECT_ID, location=REGION)
# Create index
index = aiplatform.MatchingEngineIndex.create_tree_ah_index(
display_name=DISPLAY_NAME,
dimensions=768, # text-embedding-004 dimensions
approximate_neighbors_count=10,
distance_measure_type="DOT_PRODUCT_DISTANCE",
leaf_node_embedding_count=500,
leaf_nodes_to_search_percent=5,
)
print(f"Index created: {index.resource_name}")
print(f"Index ID: {index.name}")
# Save index ID for later use
with open('vector_index_id.txt', 'w') as f:
f.write(index.name)
print("\\nIndex is being created (this takes 20-30 minutes)...")
print("Check status at: https://console.cloud.google.com/vertex-ai/matching-engine/indexes")
EOF
python scripts/create_vector_index.py
What this does: - Creates a Vector Search index optimized for semantic similarity - Uses Tree-AH algorithm (fast, approximate nearest neighbor search) - Configured for 768-dimensional embeddings - Takes 20-30 minutes to build
Note: The index creation is async. You'll get an ID immediately, but the index builds in the background.
Step 5: Deploy Index Endpoint¶
Once the index is ready (check console), deploy it:
cat > scripts/deploy_index_endpoint.py << 'EOF'
from google.cloud import aiplatform
PROJECT_ID = 'myindependent-ai'
REGION = 'europe-west3'
aiplatform.init(project=PROJECT_ID, location=REGION)
# Read index ID
with open('vector_index_id.txt') as f:
index_id = f.read().strip()
# Create endpoint
endpoint = aiplatform.MatchingEngineIndexEndpoint.create(
display_name="myindependent-ai-endpoint",
public_endpoint_enabled=True,
)
print(f"Endpoint created: {endpoint.resource_name}")
# Deploy index to endpoint
deployed_index = endpoint.deploy_index(
index=index_id,
deployed_index_id="myindependent_ai_deployed",
machine_type="e2-standard-2",
min_replica_count=1,
max_replica_count=1,
)
print(f"\\nIndex deployed!")
print(f"Endpoint ID: {endpoint.name}")
# Save for later
with open('vector_endpoint_id.txt', 'w') as f:
f.write(endpoint.name)
EOF
python scripts/deploy_index_endpoint.py
What this does:
- Creates a public endpoint (queryable via API)
- Deploys the index to the endpoint
- Uses e2-standard-2 machine (small, cost-effective)
- Takes 5-10 minutes to deploy
Step 6: Test Semantic Search¶
cat > scripts/test_search.py << 'EOF'
from google.cloud import aiplatform
from vertexai.language_models import TextEmbeddingModel
PROJECT_ID = 'myindependent-ai'
REGION = 'europe-west3'
aiplatform.init(project=PROJECT_ID, location=REGION)
# Load endpoint
with open('vector_endpoint_id.txt') as f:
endpoint_id = f.read().strip()
endpoint = aiplatform.MatchingEngineIndexEndpoint(endpoint_id)
# Generate query embedding
model = TextEmbeddingModel.from_pretrained("text-embedding-004")
query = "security alerts from Google"
query_embedding = model.get_embeddings([query])[0].values
# Search
matches = endpoint.find_neighbors(
deployed_index_id="myindependent_ai_deployed",
queries=[query_embedding],
num_neighbors=5
)
print(f"Query: {query}\\n")
print("Top 5 matches:")
for idx, match in enumerate(matches[0], 1):
print(f"{idx}. Distance: {match.distance:.4f}")
print(f" ID: {match.id}")
print(f" Metadata: {match.restricts}")
print()
EOF
python scripts/test_search.py
Expected output:
Query: security alerts from Google
Top 5 matches:
1. Distance: 0.8523
ID: importer-gmail/20260131_221352_0000_chunk_0
Metadata: {'subject': 'Security alert', 'sender': 'Google'}
2. Distance: 0.7812
ID: importer-gmail/20260131_221352_0034_chunk_0
...
Architecture Decisions Explained¶
Why Vertex AI Vector Search?¶
- Fully managed: No infrastructure to maintain
- Scales automatically: Handles millions of vectors
- Fast: Sub-100ms queries
- Integrated: Works seamlessly with Vertex AI embeddings
- Cost-effective: Pay only for what you use
Why text-embedding-004?¶
- Best quality: Latest Google model (Jan 2024)
- 768 dimensions: Good balance of quality vs. size
- Fast: ~5ms per embedding
- Multilingual: Supports 100+ languages
Chunking Strategy¶
- 1000 characters: ~200 words, good context window
- 200 overlap: Prevents losing context at boundaries
- Sentence-aware: Breaks at periods when possible
Cost Estimate (Monthly)¶
For 10,000 documents (~30,000 chunks):
- Embeddings: $0.00002 × 30,000 = $0.60
- Vector Search Index: $0.20/hour × 730 hours = $146
- Vector Search Queries: $0.001 × 10,000 queries = $10
- Total: ~$157/month
Ways to reduce: 1. Use smaller index (fewer replicas) 2. Use batch queries 3. Scale to zero when not in use (Cloud Run deployment)
Next Steps¶
After this phase works:
- Phase 4: RAG Pipeline
- LangChain integration
- Gemini 1.5 Pro for generation
-
Context window optimization
-
Phase 5: UI
- Next.js chat interface
- Real-time search
- Source citation
Troubleshooting¶
"Quota exceeded" error¶
"Index not ready" error¶
Wait 20-30 minutes after creation. Check status:
"Permission denied" error¶
Grant AI Platform User role:
gcloud projects add-iam-policy-binding myindependent-ai \\
--member="user:stefan.binder89@gmail.com" \\
--role="roles/aiplatform.user"
Summary¶
You've now built: ✅ Embedding generation pipeline ✅ Text chunking system ✅ Vector storage in GCS ✅ Vertex AI Vector Search index ✅ Semantic search capability
Total time: ~1 hour (mostly waiting for index creation)
Next: Connect to Gemini for RAG!