Skip to content

Phase 3: Embedding & Vector Pipeline - Implementation Guide

Overview

This guide walks you through implementing the embedding pipeline that converts imported data into searchable vector embeddings.

What We're Building

Flow: GCS Import → Chunking → Embedding Generation → Vector Storage → Search

┌─────────────┐      ┌──────────────┐      ┌───────────────┐      ┌──────────────┐
│  GCS Bucket │ ───▶ │   Chunking   │ ───▶ │  Vertex AI    │ ───▶ │ Vector DB    │
│  (Imports)  │      │  (Text Split)│      │  (Embeddings) │      │ (Search Index│
└─────────────┘      └──────────────┘      └───────────────┘      └──────────────┘

Step 1: Install Dependencies

cd /Users/stefan/projects/myindependent-ai
pip install google-cloud-aiplatform

What this does: Installs Vertex AI SDK for generating embeddings.


Step 2: Test the Embedding Processor Locally

python embedding/processor.py importer-gmail/20260131_221352_0000.json

What happens: 1. Reads the email JSON from GCS 2. Extracts text content 3. Splits into 1000-character chunks (with 200-char overlap) 4. Sends chunks to Vertex AI's text-embedding-004 model 5. Gets back 768-dimensional vectors 6. Stores embeddings in GCS at _embeddings/ folder

Expected output:

{
  "blob_name": "importer-gmail/20260131_221352_0000.json",
  "chunks": 2,
  "embeddings_count": 2,
  "embeddings_path": "gs://myindependent-ai-data/_embeddings/...",
  "success": true
}


Step 3: Process All Imported Emails

Create a batch processor script:

# Create batch processing script
cat > scripts/process_all_embeddings.py << 'EOF'
from google.cloud import storage
from embedding.processor import EmbeddingProcessor

# Initialize
processor = EmbeddingProcessor()
client = storage.Client(project='myindependent-ai')
bucket = client.bucket('myindependent-ai-data')

# Get all Gmail imports
blobs = list(bucket.list_blobs(prefix='importer-gmail/', delimiter='/'))
json_files = [b.name for b in blobs if b.name.endswith('.json') and '.gitkeep' not in b.name]

print(f"Found {len(json_files)} files to process")

# Process in batch
result = processor.process_batch(json_files)

print(f"\\nResults:")
print(f"  Total: {result['total']}")
print(f"  Successful: {result['successful']}")
print(f"  Failed: {result['failed']}")
EOF

# Run it
python scripts/process_all_embeddings.py

What this does: Processes all 100 Gmail imports, generating ~200-300 embedding vectors total.

Time: ~2-3 minutes for 100 emails


Step 4: Create Vertex AI Vector Search Index

Now we'll create a searchable index from the embeddings:

# Create index creation script
cat > scripts/create_vector_index.py << 'EOF'
from google.cloud import aiplatform
from google.cloud import storage
import json

PROJECT_ID = 'myindependent-ai'
REGION = 'europe-west3'
DISPLAY_NAME = 'myindependent-ai-index'

# Initialize
aiplatform.init(project=PROJECT_ID, location=REGION)

# Create index
index = aiplatform.MatchingEngineIndex.create_tree_ah_index(
    display_name=DISPLAY_NAME,
    dimensions=768,  # text-embedding-004 dimensions
    approximate_neighbors_count=10,
    distance_measure_type="DOT_PRODUCT_DISTANCE",
    leaf_node_embedding_count=500,
    leaf_nodes_to_search_percent=5,
)

print(f"Index created: {index.resource_name}")
print(f"Index ID: {index.name}")

# Save index ID for later use
with open('vector_index_id.txt', 'w') as f:
    f.write(index.name)

print("\\nIndex is being created (this takes 20-30 minutes)...")
print("Check status at: https://console.cloud.google.com/vertex-ai/matching-engine/indexes")
EOF

python scripts/create_vector_index.py

What this does: - Creates a Vector Search index optimized for semantic similarity - Uses Tree-AH algorithm (fast, approximate nearest neighbor search) - Configured for 768-dimensional embeddings - Takes 20-30 minutes to build

Note: The index creation is async. You'll get an ID immediately, but the index builds in the background.


Step 5: Deploy Index Endpoint

Once the index is ready (check console), deploy it:

cat > scripts/deploy_index_endpoint.py << 'EOF'
from google.cloud import aiplatform

PROJECT_ID = 'myindependent-ai'
REGION = 'europe-west3'

aiplatform.init(project=PROJECT_ID, location=REGION)

# Read index ID
with open('vector_index_id.txt') as f:
    index_id = f.read().strip()

# Create endpoint
endpoint = aiplatform.MatchingEngineIndexEndpoint.create(
    display_name="myindependent-ai-endpoint",
    public_endpoint_enabled=True,
)

print(f"Endpoint created: {endpoint.resource_name}")

# Deploy index to endpoint
deployed_index = endpoint.deploy_index(
    index=index_id,
    deployed_index_id="myindependent_ai_deployed",
    machine_type="e2-standard-2",
    min_replica_count=1,
    max_replica_count=1,
)

print(f"\\nIndex deployed!")
print(f"Endpoint ID: {endpoint.name}")

# Save for later
with open('vector_endpoint_id.txt', 'w') as f:
    f.write(endpoint.name)
EOF

python scripts/deploy_index_endpoint.py

What this does: - Creates a public endpoint (queryable via API) - Deploys the index to the endpoint - Uses e2-standard-2 machine (small, cost-effective) - Takes 5-10 minutes to deploy


cat > scripts/test_search.py << 'EOF'
from google.cloud import aiplatform
from vertexai.language_models import TextEmbeddingModel

PROJECT_ID = 'myindependent-ai'
REGION = 'europe-west3'

aiplatform.init(project=PROJECT_ID, location=REGION)

# Load endpoint
with open('vector_endpoint_id.txt') as f:
    endpoint_id = f.read().strip()

endpoint = aiplatform.MatchingEngineIndexEndpoint(endpoint_id)

# Generate query embedding
model = TextEmbeddingModel.from_pretrained("text-embedding-004")
query = "security alerts from Google"
query_embedding = model.get_embeddings([query])[0].values

# Search
matches = endpoint.find_neighbors(
    deployed_index_id="myindependent_ai_deployed",
    queries=[query_embedding],
    num_neighbors=5
)

print(f"Query: {query}\\n")
print("Top 5 matches:")
for idx, match in enumerate(matches[0], 1):
    print(f"{idx}. Distance: {match.distance:.4f}")
    print(f"   ID: {match.id}")
    print(f"   Metadata: {match.restricts}")
    print()
EOF

python scripts/test_search.py

Expected output:

Query: security alerts from Google

Top 5 matches:
1. Distance: 0.8523
   ID: importer-gmail/20260131_221352_0000_chunk_0
   Metadata: {'subject': 'Security alert', 'sender': 'Google'}

2. Distance: 0.7812
   ID: importer-gmail/20260131_221352_0034_chunk_0
   ...


Architecture Decisions Explained

  • Fully managed: No infrastructure to maintain
  • Scales automatically: Handles millions of vectors
  • Fast: Sub-100ms queries
  • Integrated: Works seamlessly with Vertex AI embeddings
  • Cost-effective: Pay only for what you use

Why text-embedding-004?

  • Best quality: Latest Google model (Jan 2024)
  • 768 dimensions: Good balance of quality vs. size
  • Fast: ~5ms per embedding
  • Multilingual: Supports 100+ languages

Chunking Strategy

  • 1000 characters: ~200 words, good context window
  • 200 overlap: Prevents losing context at boundaries
  • Sentence-aware: Breaks at periods when possible

Cost Estimate (Monthly)

For 10,000 documents (~30,000 chunks):

  • Embeddings: $0.00002 × 30,000 = $0.60
  • Vector Search Index: $0.20/hour × 730 hours = $146
  • Vector Search Queries: $0.001 × 10,000 queries = $10
  • Total: ~$157/month

Ways to reduce: 1. Use smaller index (fewer replicas) 2. Use batch queries 3. Scale to zero when not in use (Cloud Run deployment)


Next Steps

After this phase works:

  1. Phase 4: RAG Pipeline
  2. LangChain integration
  3. Gemini 1.5 Pro for generation
  4. Context window optimization

  5. Phase 5: UI

  6. Next.js chat interface
  7. Real-time search
  8. Source citation

Troubleshooting

"Quota exceeded" error

gcloud services enable aiplatform.googleapis.com --project=myindependent-ai

"Index not ready" error

Wait 20-30 minutes after creation. Check status:

gcloud ai indexes list --region=europe-west3

"Permission denied" error

Grant AI Platform User role:

gcloud projects add-iam-policy-binding myindependent-ai \\
  --member="user:stefan.binder89@gmail.com" \\
  --role="roles/aiplatform.user"


Summary

You've now built: ✅ Embedding generation pipeline ✅ Text chunking system ✅ Vector storage in GCS ✅ Vertex AI Vector Search index ✅ Semantic search capability

Total time: ~1 hour (mostly waiting for index creation)

Next: Connect to Gemini for RAG!