Building a Production RAG System: Part 4 - Building the PDF Ingestion Pipeline

Series: Building a Production-Ready Textbook Q&A System with RAG
Part: 4 of 7 Read Time: 25 minutes Level: Intermediate

What We'll Build in This Part

By the end of this tutorial, you'll have:

Estimated time: 2-3 hours

The Ingestion Pipeline Overview

The ingestion pipeline transforms a PDF textbook into searchable vector embeddings:

PDF File (textbook.pdf)
    ↓
[1] Extract Text → "Chapter 1: Introduction to JavaScript..."
    ↓
[2] Chunk Text → ["JavaScript is a...", "Functions in JS...", ...]
    ↓
[3] Generate Embeddings → [[0.234, -0.567, ...], [0.891, ...], ...]
    ↓
[4] Sanitize Content → Remove null bytes, control characters
    ↓
[5] Store in Supabase → document_chunks table with vectors
    ↓
✅ Searchable Knowledge Base
            
Why each step matters:
  1. Extract: Get raw text from PDF pages
  2. Chunk: Break into digestible pieces (1000 chars)
  3. Embed: Convert text to semantic vectors (1536D)
  4. Sanitize: Clean data to prevent database errors
  5. Store: Save with proper pgvector format

Step 1: Set Up Dependencies

Install Required Packages

npm install pdf-parse dotenv @supabase/supabase-js openai
npm install --save-dev @types/node
What we installed:

Step 2: Extract Text from PDF

Create scripts/ingest.ts with the complete extraction logic:

import fs from 'fs'
import path from 'path'
import dotenv from 'dotenv'
import OpenAI from 'openai'
import { createClient } from '@supabase/supabase-js'

// Load environment variables
dotenv.config({ path: path.join(process.cwd(), '.env.local') })

// Initialize clients
const openai = new OpenAI({ apiKey: process.env.OPENAI_API_KEY })
const supabase = createClient(
  process.env.NEXT_PUBLIC_SUPABASE_URL,
  process.env.SUPABASE_SERVICE_ROLE_KEY
)

// Extract text from PDF
async function extractTextFromPDF(filePath: string): Promise<string> {
  console.log('📖 Extracting text from PDF...')
  const { default: pdfParse } = await import('pdf-parse/lib/pdf-parse.js')
  const dataBuffer = fs.readFileSync(filePath)
  const data = await pdfParse(dataBuffer)

  console.log(`✅ Extracted ${data.numpages} pages`)
  return data.text
}

Step 3: Implement Smart Chunking

The chunking strategy is critical for good retrieval quality:

const CHUNK_SIZE = 1000 // Characters per chunk
const CHUNK_OVERLAP = 200 // Character overlap

function chunkText(
  text: string,
  pageNumber: number,
  startChunkIndex: number = 0
): DocumentChunk[] {
  const chunks: DocumentChunk[] = []
  let startIndex = 0
  let chunkIndex = startChunkIndex

  while (startIndex < text.length) {
    const endIndex = Math.min(startIndex + CHUNK_SIZE, text.length)
    const chunk = text.slice(startIndex, endIndex)

    if (chunk.trim().length > 0) {
      chunks.push({
        content: chunk.trim(),
        pageNumber,
        chunkIndex,
      })
      chunkIndex++
    }

    startIndex += CHUNK_SIZE - CHUNK_OVERLAP
  }

  return chunks
}
Why this chunking strategy?
  1. Fixed size (1000 chars): Consistent embedding quality
  2. Overlap (200 chars): Preserves context across boundaries
  3. Global index: Prevents duplicate key errors
  4. Skip empty: Avoids wasting embeddings on whitespace

Step 4: Generate Embeddings in Batches

const BATCH_SIZE = 100

async function generateEmbeddings(chunks: DocumentChunk[]): Promise<number[][]> {
  console.log(`🤖 Generating embeddings for ${chunks.length} chunks...`)
  const embeddings: number[][] = []

  for (let i = 0; i < chunks.length; i += BATCH_SIZE) {
    const batch = chunks.slice(i, i + BATCH_SIZE)
    const batchTexts = batch.map(chunk => chunk.content)

    const response = await openai.embeddings.create({
      model: 'text-embedding-ada-002',
      input: batchTexts,
    })

    embeddings.push(...response.data.map(item => item.embedding))
    console.log(`  ⏳ Progress: ${i + batch.length}/${chunks.length}`)

    await new Promise(resolve => setTimeout(resolve, 100))
  }

  return embeddings
}
Batching strategy:

Step 5: Sanitize and Store Chunks

Critical fixes to avoid PostgreSQL errors:

function sanitizeContent(content: string): string {
  return content
    .replace(/\u0000/g, '') // Remove null bytes
    .replace(/[\x00-\x08\x0B\x0C\x0E-\x1F\x7F]/g, '') // Control chars
    .trim()
}

async function storeChunks(
  documentId: string,
  chunks: DocumentChunk[],
  embeddings: number[][]
): Promise<void> {
  for (let i = 0; i < chunks.length; i += 50) {
    const batch = chunks.slice(i, i + 50).map((chunk, idx) => {
      const sanitizedContent = sanitizeContent(chunk.content)
      if (!sanitizedContent) return null

      const embedding = embeddings[i + idx]
      const embeddingString = `[${embedding.join(',')}]`

      return {
        document_id: documentId,
        content: sanitizedContent,
        page_number: chunk.pageNumber,
        chunk_index: chunk.chunkIndex,
        embedding: embeddingString,
      }
    }).filter(Boolean)

    await supabase.from('document_chunks').insert(batch)
  }
}
Critical fixes:
  1. Sanitize content: Remove null bytes that break PostgreSQL
  2. pgvector string format: [0.1,0.2,0.3] not [0.1, 0.2, 0.3]
  3. Batch inserts: 50 at a time to avoid payload limits
  4. Skip empty chunks: Don't waste database space

Step 6: Run the Ingestion

Update package.json:

{
  "scripts": {
    "ingest": "tsx scripts/ingest.ts"
  }
}

Then run:

npm run ingest YOUR_USER_ID ./textbooks/sample.pdf "Sample Book" "Author"

Expected Output

🚀 Starting PDF ingestion pipeline...
📖 Extracting text from PDF...
✅ Extracted 150 pages
✂️  Chunking text...
✅ Created 342 chunks
🤖 Generating embeddings for 342 chunks...
  ⏳ Progress: 342/342 chunks
💾 Storing 342 chunks in database...
✅ Successfully stored 342 chunks

✅ Ingestion complete!
⏱️  Total time: 45.23s
📊 Chunks created: 342
💰 Estimated cost: $0.0342
            

Cost Optimization Strategies

Strategy 1: Cache Embeddings

Don't re-embed the same content:

async function getOrCreateEmbedding(content: string): Promise<number[]> {
  // Check cache first
  const { data } = await supabase
    .from('embedding_cache')
    .select('embedding')
    .eq('content_hash', hashContent(content))
    .single()

  if (data) return JSON.parse(data.embedding)

  // Generate and cache
  const embedding = await generateEmbedding(content)
  await supabase.from('embedding_cache').insert({
    content_hash: hashContent(content),
    embedding: JSON.stringify(embedding),
  })

  return embedding
}

Strategy 2: Skip Redundant Content

function shouldSkipChunk(content: string): boolean {
  const lowercaseContent = content.toLowerCase()

  if (/^\s*\d+\s*$/.test(content)) return true // Just page numbers
  if (content.trim().length < 50) return true // Too short
  if (lowercaseContent.includes('table of contents')) return true

  return false
}

What We Accomplished

Homework Challenge

  1. Add duplicate detection: Check if document already exists
  2. Implement resume capability: Resume from last chunk on failure
  3. Add OCR support: Handle image-based PDFs using Tesseract
  4. Create batch ingestion: Process multiple PDFs in one command
Tags:
#PDF #Ingestion #Embeddings #OpenAI #DataPipeline #RAG