| | import '../src/config.js';
import { MongoClient } from 'mongodb';
import { getEmbedding, getEmbeddingWithHeaders } from '../src/get-embeddings.js';
import RateLimiter from '../src/rate-limiter.js';
import fs from 'fs';
import { parse } from 'csv-parse';
import path from 'path';
import { fileURLToPath } from 'url';
import { dirname } from 'path';
const __filename = fileURLToPath(import.meta.url);
const __dirname = dirname(__filename);
// Configuration
const CHUNK_DIR = path.join(__dirname, 'chunks');
const DB_NAME = "rag_db";
const COLLECTION_NAME = "inventory";
const EMBEDDING_MODEL = "text-embedding-3-large"; // or "text-embedding-3-large"
const BATCH_SIZE = 10; // Reduced for rate limiting - process 10 at a time
// Rate limiter configuration based on model
const RATE_LIMIT_CONFIG = {
"text-embedding-3-small": {
maxRequestsPerMinute: 400, // Safe under 500 free-tier limit
maxTokensPerMinute: 1000000
},
"text-embedding-3-large": {
maxRequestsPerMinute: 400,
maxTokensPerMinute: 1000000
}
};
// MongoDB connection
const client = new MongoClient(process.env.ATLAS_CONNECTION_STRING);
// Create rate limiter
const rateLimiter = new RateLimiter(RATE_LIMIT_CONFIG[EMBEDDING_MODEL]);
// Colors
const colors = {
reset: '\x1b[0m',
bright: '\x1b[1m',
green: '\x1b[32m',
blue: '\x1b[34m',
yellow: '\x1b[33m',
red: '\x1b[31m',
cyan: '\x1b[36m'
};
// Create structured embedding text from product
function createEmbeddingText(product) {
const parts = [];
if (product.Name) parts.push(`Name: ${product.Name}`);
if (product.Description) parts.push(`Description: ${product.Description}`);
if (product.Category) parts.push(`Category: ${product.Category}`);
if (product.Manufacturer) parts.push(`Manufacturer: ${product.Manufacturer}`);
if (product['Manufacturer Part Number']) parts.push(`Manufacturer Part Number: ${product['Manufacturer Part Number']}`);
if (product.Supplier) parts.push(`Supplier: ${product.Supplier}`);
if (product['Supplier Part Number']) parts.push(`Supplier Part Number: ${product['Supplier Part Number']}`);
if (product.SKU) parts.push(`SKU: ${product.SKU}`);
if (product['cTag UID']) parts.push(`cTag UID: ${product['cTag UID']}`);
if (product['Internal UID']) parts.push(`Internal UID: ${product['Internal UID']}`);
return parts.join('\n');
}
// Transform CSV row to MongoDB document
function transformToDocument(row) {
return {
internalUid: row['Internal UID'] || '',
name: row['Name'] || '',
description: row['Description'] || '',
category: row['Category'] || '',
supplier: row['Supplier'] || '',
supplierPartNumber: row['Supplier Part Number'] || '',
manufacturer: row['Manufacturer'] || '',
manufacturerPartNumber: row['Manufacturer Part Number'] || '',
sku: row['SKU'] || '',
cTagUid: row['cTag UID'] || '',
price: row['Price'] || '',
language: row['Language'] || 'EN',
image_url: row['image_url'] || ''
};
}
// Process a batch of records with rate limiting
async function processBatch(collection, records, chunkNum, batchNum) {
const batchStartTime = Date.now();
try {
// Generate embeddings ONE AT A TIME with rate limiting
// This is slower but respects rate limits
const results = [];
for (let idx = 0; idx < records.length; idx++) {
const record = records[idx];
let retries = 0;
const maxRetries = 3;
while (retries <= maxRetries) {
try {
// Wait if needed based on rate limits
await rateLimiter.waitIfNeeded();
const text = createEmbeddingText(record);
// Try to get embedding with headers for rate limit tracking
let embedding, headers;
try {
const response = await getEmbeddingWithHeaders(text, EMBEDDING_MODEL);
embedding = response.embedding;
headers = response.headers;
// Update rate limiter with response headers
rateLimiter.onSuccess(headers);
} catch (headerError) {
// Fallback to simple version if header version fails
if (headerError.isRateLimit) {
throw headerError; // Re-throw rate limit errors
}
embedding = await getEmbedding(text, EMBEDDING_MODEL);
rateLimiter.onSuccess({});
}
results.push({ record, embedding, text });
break; // Success, exit retry loop
} catch (error) {
// Handle rate limit errors
if (error.isRateLimit || error.status === 429) {
retries++;
const retryAfter = error.retryAfter || Math.pow(2, retries) * 5;
console.log(`\n${colors.yellow}⏳ Rate limit hit! Waiting ${retryAfter}s before retry ${retries}/${maxRetries}...${colors.reset}`);
rateLimiter.onRateLimit(retryAfter);
if (retries <= maxRetries) {
await new Promise(resolve => setTimeout(resolve, retryAfter * 1000));
} else {
console.error(`\n${colors.red}❌ Max retries exceeded for record ${idx}${colors.reset}`);
results.push(null);
break;
}
} else {
// Other error
console.error(`\n${colors.red}❌ Embedding error for record ${idx}:${colors.reset}`, error.message);
rateLimiter.onError();
results.push(null);
break;
}
}
}
}
const validResults = results.filter(r => r !== null);
// Prepare documents for insertion
const documents = validResults.map(result => ({
...transformToDocument(result.record),
embedding: result.embedding,
embeddingText: result.text,
embeddingModel: EMBEDDING_MODEL,
embeddingUpdatedAt: new Date(),
importedChunk: chunkNum,
importedBatch: batchNum,
importedAt: new Date()
}));
// Insert into MongoDB (upsert based on internalUid or name+sku)
if (documents.length > 0) {
const bulkOps = documents.map(doc => ({
updateOne: {
filter: doc.internalUid ?
{ internalUid: doc.internalUid } :
{ name: doc.name, sku: doc.sku },
update: { $set: doc },
upsert: true
}
}));
await collection.bulkWrite(bulkOps);
}
const batchTime = Date.now() - batchStartTime;
return {
inserted: documents.length,
errors: records.length - validResults.length,
time: batchTime
};
} catch (error) {
console.error(`\n${colors.red}❌ Batch processing error:${colors.reset}`, error.message);
throw error;
}
}
// Import a single chunk
async function importChunk(chunkNumber) {
const chunkFileName = `chunk_${String(chunkNumber).padStart(4, '0')}.csv`;
const chunkPath = path.join(CHUNK_DIR, chunkFileName);
if (!fs.existsSync(chunkPath)) {
console.error(`${colors.red}❌ Chunk file not found: ${chunkFileName}${colors.reset}`);
return null;
}
console.log(`\n${colors.cyan}${colors.bright}━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━${colors.reset}`);
console.log(`${colors.bright}Importing Chunk ${chunkNumber}${colors.reset}`);
console.log(`${colors.cyan}━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━${colors.reset}\n`);
console.log(`${colors.blue}📂 File:${colors.reset} ${chunkFileName}`);
console.log(`${colors.blue}🤖 Model:${colors.reset} ${EMBEDDING_MODEL}`);
console.log(`${colors.blue}📦 Batch size:${colors.reset} ${BATCH_SIZE}`);
console.log(`${colors.blue}⏱️ Rate limit:${colors.reset} ${RATE_LIMIT_CONFIG[EMBEDDING_MODEL].maxRequestsPerMinute} req/min\n`);
const startTime = Date.now();
try {
await client.connect();
const db = client.db(DB_NAME);
const collection = db.collection(COLLECTION_NAME);
// Read and process chunk
const records = [];
const parser = fs.createReadStream(chunkPath).pipe(
parse({
columns: true,
skip_empty_lines: true,
relax_column_count: true
})
);
for await (const record of parser) {
records.push(record);
}
console.log(`${colors.green}✅ Loaded ${records.length} records from chunk${colors.reset}\n`);
// Process in batches
let totalInserted = 0;
let totalErrors = 0;
const totalBatches = Math.ceil(records.length / BATCH_SIZE);
for (let i = 0; i < records.length; i += BATCH_SIZE) {
const batch = records.slice(i, i + BATCH_SIZE);
const batchNum = Math.floor(i / BATCH_SIZE) + 1;
process.stdout.write(`${colors.yellow}⏳ Processing batch ${batchNum}/${totalBatches} (${i + 1}-${Math.min(i + BATCH_SIZE, records.length)})...${colors.reset}`);
const result = await processBatch(collection, batch, chunkNumber, batchNum);
totalInserted += result.inserted;
totalErrors += result.errors;
process.stdout.write(`\r${colors.green}✅ Batch ${batchNum}/${totalBatches} complete: ${result.inserted} inserted, ${result.errors} errors (${result.time}ms)${colors.reset}\n`);
}
const totalTime = Date.now() - startTime;
const avgTimePerRecord = (totalTime / records.length).toFixed(0);
// Get rate limiter statistics
const rateLimiterStats = rateLimiter.getStats();
console.log(`\n${colors.green}${colors.bright}✅ Chunk ${chunkNumber} import complete!${colors.reset}`);
console.log(`${colors.blue}📊 Total records:${colors.reset} ${records.length}`);
console.log(`${colors.blue}✅ Inserted:${colors.reset} ${totalInserted}`);
console.log(`${colors.blue}❌ Errors:${colors.reset} ${totalErrors}`);
console.log(`${colors.blue}⏱️ Total time:${colors.reset} ${(totalTime / 1000 / 60).toFixed(1)} minutes`);
console.log(`${colors.blue}⚡ Avg time/record:${colors.reset} ${avgTimePerRecord}ms`);
console.log(`\n${colors.cyan}📊 Rate Limiter Stats:${colors.reset}`);
console.log(`${colors.blue} API requests:${colors.reset} ${rateLimiterStats.totalRequests}`);
console.log(`${colors.blue} Rate limit hits:${colors.reset} ${rateLimiterStats.rateLimitHits}`);
console.log(`${colors.blue} Total wait time:${colors.reset} ${rateLimiterStats.totalWaitTimeSeconds}s`);
if (rateLimiterStats.remainingRequests !== null) {
console.log(`${colors.blue} Remaining requests:${colors.reset} ${rateLimiterStats.remainingRequests}`);
}
console.log();
return {
chunkNumber,
totalRecords: records.length,
inserted: totalInserted,
errors: totalErrors,
time: totalTime
};
} catch (error) {
console.error(`\n${colors.red}❌ Import error:${colors.reset}`, error.message);
throw error;
}
}
// Import multiple chunks
async function importChunks(startChunk, endChunk) {
console.log(`${colors.cyan}${colors.bright}╔═══════════════════════════════════════════════════════╗${colors.reset}`);
console.log(`${colors.cyan}${colors.bright}║ MONGODB CHUNK IMPORT TOOL ║${colors.reset}`);
console.log(`${colors.cyan}${colors.bright}╚═══════════════════════════════════════════════════════╝${colors.reset}\n`);
console.log(`${colors.blue}📦 Chunks to import:${colors.reset} ${startChunk} to ${endChunk}`);
console.log(`${colors.blue}🤖 Embedding model:${colors.reset} ${EMBEDDING_MODEL}`);
console.log(`${colors.blue}📐 Dimensions:${colors.reset} ${EMBEDDING_MODEL === 'text-embedding-3-large' ? 3072 : 1536}`);
console.log(`${colors.blue}⏱️ Rate limit:${colors.reset} ${RATE_LIMIT_CONFIG[EMBEDDING_MODEL].maxRequestsPerMinute} req/min`);
const overallStartTime = Date.now();
const results = [];
try {
for (let chunkNum = startChunk; chunkNum <= endChunk; chunkNum++) {
const result = await importChunk(chunkNum);
if (result) {
results.push(result);
} else {
console.log(`${colors.yellow}⚠️ Skipping chunk ${chunkNum} (not found)${colors.reset}`);
}
// Reset rate limiter stats between chunks (but keep tracking for limits)
if (chunkNum < endChunk) {
console.log(`${colors.cyan}⏸️ Pausing 5 seconds between chunks...${colors.reset}\n`);
await new Promise(resolve => setTimeout(resolve, 5000));
}
}
// Overall summary
const totalTime = Date.now() - overallStartTime;
const totalRecords = results.reduce((sum, r) => sum + r.totalRecords, 0);
const totalInserted = results.reduce((sum, r) => sum + r.inserted, 0);
const totalErrors = results.reduce((sum, r) => sum + r.errors, 0);
// Get final rate limiter stats
const finalStats = rateLimiter.getStats();
console.log(`${colors.cyan}${colors.bright}━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━${colors.reset}`);
console.log(`${colors.green}${colors.bright}✅ ALL CHUNKS IMPORTED!${colors.reset}\n`);
console.log(`${colors.blue}📊 Import Summary:${colors.reset}`);
console.log(` Chunks processed: ${results.length}`);
console.log(` Total records: ${totalRecords.toLocaleString()}`);
console.log(` Successfully inserted: ${totalInserted.toLocaleString()}`);
console.log(` Errors: ${totalErrors}`);
console.log(` Total time: ${(totalTime / 1000 / 60).toFixed(1)} minutes`);
console.log(`\n${colors.cyan}📊 Rate Limiter Summary:${colors.reset}`);
console.log(` Total API requests: ${finalStats.totalRequests.toLocaleString()}`);
console.log(` Rate limit hits: ${finalStats.rateLimitHits}`);
console.log(` Total wait time: ${finalStats.totalWaitTimeSeconds}s (${(finalStats.totalWaitTimeSeconds / 60).toFixed(1)} min)`);
console.log(` Avg requests/min: ${((finalStats.totalRequests / (totalTime / 60000)).toFixed(0))}`);
console.log(`${colors.cyan}${colors.bright}━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━${colors.reset}\n`);
} catch (error) {
console.error(`${colors.red}❌ Import process failed:${colors.reset}`, error.message);
throw error;
} finally {
await client.close();
console.log(`${colors.yellow}🔌 Disconnected from MongoDB${colors.reset}\n`);
}
}
// List available chunks
function listChunks() {
console.log(`${colors.cyan}${colors.bright}Available Chunks:${colors.reset}\n`);
if (!fs.existsSync(CHUNK_DIR)) {
console.log(`${colors.red}❌ Chunks directory not found: ${CHUNK_DIR}${colors.reset}`);
console.log(`${colors.yellow}💡 Run: node tools/split-csv.js${colors.reset}\n`);
return [];
}
const chunks = fs.readdirSync(CHUNK_DIR)
.filter(f => f.startsWith('chunk_') && f.endsWith('.csv'))
.sort();
if (chunks.length === 0) {
console.log(`${colors.red}❌ No chunks found in ${CHUNK_DIR}${colors.reset}`);
console.log(`${colors.yellow}💡 Run: node tools/split-csv.js${colors.reset}\n`);
return [];
}
chunks.forEach((chunk, idx) => {
const chunkPath = path.join(CHUNK_DIR, chunk);
const stats = fs.statSync(chunkPath);
const sizeMB = (stats.size / (1024 * 1024)).toFixed(2);
console.log(` ${colors.bright}${idx + 1}.${colors.reset} ${chunk} (${sizeMB} MB)`);
});
console.log(`\n${colors.blue}📊 Total chunks: ${chunks.length}${colors.reset}\n`);
return chunks;
}
// Parse command line arguments
function parseArgs() {
const args = process.argv.slice(2);
if (args.length === 0 || args.includes('--help') || args.includes('-h')) {
return { action: 'help' };
}
if (args.includes('--list') || args.includes('-l')) {
return { action: 'list' };
}
const firstArg = args[0];
if (firstArg === 'all') {
return { action: 'import', mode: 'all' };
}
const startChunk = parseInt(firstArg);
if (isNaN(startChunk)) {
return { action: 'error', message: 'Invalid chunk number' };
}
if (args.length === 1) {
return { action: 'import', mode: 'single', start: startChunk, end: startChunk };
}
const endChunk = parseInt(args[1]);
if (isNaN(endChunk)) {
return { action: 'error', message: 'Invalid end chunk number' };
}
return { action: 'import', mode: 'range', start: startChunk, end: endChunk };
}
// Main function
async function main() {
const parsed = parseArgs();
if (parsed.action === 'help') {
console.log(`${colors.cyan}${colors.bright}MongoDB Chunk Import Tool${colors.reset}\n`);
console.log('Import CSV chunks into MongoDB with embeddings\n');
console.log('Usage:');
console.log(` ${colors.bright}node tools/import-chunk.js <chunk>${colors.reset} Import single chunk`);
console.log(` ${colors.bright}node tools/import-chunk.js <start> <end>${colors.reset} Import chunk range`);
console.log(` ${colors.bright}node tools/import-chunk.js all${colors.reset} Import all chunks`);
console.log(` ${colors.bright}node tools/import-chunk.js --list${colors.reset} List available chunks`);
console.log('\nExamples:');
console.log(` ${colors.blue}node tools/import-chunk.js 1${colors.reset} # Import chunk 1`);
console.log(` ${colors.blue}node tools/import-chunk.js 1 5${colors.reset} # Import chunks 1-5`);
console.log(` ${colors.blue}node tools/import-chunk.js all${colors.reset} # Import all chunks`);
console.log('\nOptions:');
console.log(' --list, -l List available chunks');
console.log(' --help, -h Show this help\n');
console.log(`${colors.yellow}⚠️ Note: Make sure to run split-csv.js first!${colors.reset}\n`);
return;
}
if (parsed.action === 'list') {
listChunks();
return;
}
if (parsed.action === 'error') {
console.error(`${colors.red}❌ Error: ${parsed.message}${colors.reset}\n`);
console.log(`${colors.yellow}💡 Run with --help for usage information${colors.reset}\n`);
return;
}
if (parsed.action === 'import') {
// List available chunks
const availableChunks = listChunks();
if (availableChunks.length === 0) {
return;
}
// Determine chunk range
let startChunk, endChunk;
if (parsed.mode === 'all') {
startChunk = 1;
endChunk = availableChunks.length;
console.log(`${colors.yellow}⚠️ About to import ALL ${availableChunks.length} chunks!${colors.reset}`);
} else {
startChunk = parsed.start;
endChunk = parsed.end;
}
// Validate chunk numbers
if (startChunk < 1 || startChunk > availableChunks.length) {
console.error(`${colors.red}❌ Invalid start chunk: ${startChunk}${colors.reset}`);
console.log(`${colors.yellow}Available chunks: 1 to ${availableChunks.length}${colors.reset}\n`);
return;
}
if (endChunk < startChunk || endChunk > availableChunks.length) {
console.error(`${colors.red}❌ Invalid end chunk: ${endChunk}${colors.reset}`);
console.log(`${colors.yellow}Available chunks: 1 to ${availableChunks.length}${colors.reset}\n`);
return;
}
// Estimate cost and time
const estimatedRecords = (endChunk - startChunk + 1) * 50000;
const estimatedCost = (estimatedRecords / 1000000) * 0.13; // text-embedding-3-large cost
const estimatedMinutes = (estimatedRecords * 0.15) / 1000 / 60; // ~150ms per record
console.log(`${colors.yellow}📊 Estimate:${colors.reset}`);
console.log(` Records: ~${estimatedRecords.toLocaleString()}`);
console.log(` Time: ~${estimatedMinutes.toFixed(0)} minutes`);
console.log(` API Cost: ~$${estimatedCost.toFixed(2)}\n`);
console.log(`${colors.yellow}⚠️ This will generate embeddings and insert into MongoDB${colors.reset}`);
console.log(`${colors.yellow}⚠️ Press Ctrl+C to cancel, or wait 5 seconds to continue...${colors.reset}\n`);
await new Promise(resolve => setTimeout(resolve, 5000));
// Import chunks
await importChunks(startChunk, endChunk);
}
}
// Handle graceful shutdown
process.on('SIGINT', async () => {
console.log(`\n\n${colors.yellow}⚠️ Process interrupted. Cleaning up...${colors.reset}`);
await client.close();
process.exit(0);
});
// Run
main().catch(async (error) => {
console.error(`${colors.red}💥 Fatal error:${colors.reset}`, error);
await client.close();
process.exit(1);
});
|