Overview
Insight Flow is a web application that lets YouTube channel owners give their audience a way to ask questions against their video content. Users transcribe their YouTube videos, the transcriptions are chunked and embedded into a PostgreSQL vector database, and visitors can chat with an AI that synthesizes answers from the video content — complete with timestamped YouTube links back to the source material.
The stack is Next.js 15, TypeScript, Prisma with PostgreSQL and pgvector, tRPC, Inngest for async transcription pipelines, Google Gemini for embeddings and chat, LangChain for structured output parsing, and Clerk for authentication.
Try Insight Flow or explore the source code.
Architecture
Transcription Pipeline
Video transcription runs asynchronously through Inngest. When a user selects videos to transcribe, the system fires an Inngest event that processes them in configurable batch sizes. The pipeline supports multiple modes — full transcription with embeddings, transcription-only, re-transcription of existing videos, and single-video retry:
export const transcribeVideosHandler = inngest.createFunction(
{ id: "transcribe-videos-handler", retries: 3 },
{ event: "transcription/videos.submitted" },
async ({ event, step }) => {
const { youtubeVideos, userEmail, batchSize = 5 } = event.data;
await step.run("send-start-notification", async () => {
await inngest.send({
name: "transcription/notification.started",
data: {
userEmail,
videoCount: youtubeVideos.length,
type: "full-transcription",
},
});
});
const result = await step.run("execute-transcription", async () => {
return await transcribeVideos({ youtubeVideos, userEmail, batchSize });
});
return result;
},
);
Each transcription mode fires its own Inngest event, and a notification handler listens for start events to keep users informed about long-running jobs.
Embedding System
After transcription, text chunks are embedded using Google's Gemini embedding model. The system batches embeddings in groups of 100 with up to 5 concurrent API calls, using exponential backoff retry logic to handle rate limits:
export const getEmbeddings = async (text: string[]) => {
const batchSize = 100;
const maxConcurrent = 5;
const allEmbeddings: (number[] | undefined)[] = new Array(text.length);
const batches: string[][] = [];
for (let i = 0; i < text.length; i += batchSize) {
batches.push(text.slice(i, i + batchSize));
}
for (let i = 0; i < batches.length; i += maxConcurrent) {
const batchGroup = batches.slice(i, i + maxConcurrent);
const promises = batchGroup.map(async (batch, groupIndex) => {
return await retryWithBackoff(
async () => {
const response = await ai.models.embedContent({
model: "gemini-embedding-001",
contents: batch,
});
const batchEmbeddings = response.embeddings.map(
(embedding) => embedding.values,
);
batchEmbeddings.forEach((embedding, embeddingIndex) => {
allEmbeddings[startIndex + embeddingIndex] = embedding;
});
return batchEmbeddings;
},
3,
1000,
operationName,
);
});
await Promise.all(promises);
}
return allEmbeddings;
};
Transcript Chunking
Raw transcription segments are often too short for meaningful embeddings. The system merges consecutive chunks until each one spans at least 10 seconds of video content, producing denser text that captures more context per embedding:
function mergeTranscriptChunks(
transcript: Transcript,
minChunkDurationSeconds: number = 10,
): Transcript {
const mergedChunks: Transcript = [];
let currentChunk: TranscriptChunk | null = null;
let currentChunkStartTime: number = 0;
for (let i = 0; i < transcript.length; i++) {
const chunk = transcript[i];
const chunkTimestamp =
typeof chunk.timestamp === "number"
? chunk.timestamp
: convertTimestampToSeconds(chunk.timestamp.toString()) || 0;
if (currentChunk === null) {
currentChunk = {
timestamp: chunkTimestamp,
text: chunk.text,
embedding: null,
};
currentChunkStartTime = chunkTimestamp;
} else {
const chunkDuration = chunkTimestamp - currentChunkStartTime;
if (chunkDuration < minChunkDurationSeconds) {
// Merge: append text to current chunk
currentChunk.text += " " + chunk.text;
} else {
mergedChunks.push(currentChunk);
currentChunk = {
timestamp: chunkTimestamp,
text: chunk.text,
embedding: null,
};
currentChunkStartTime = chunkTimestamp;
}
}
}
if (currentChunk) mergedChunks.push(currentChunk);
return mergedChunks;
}
Vector Storage with pgvector
Prisma doesn't natively support PostgreSQL's vector type, so all embedding storage and retrieval uses raw SQL. The schema declares the embedding column as Unsupported("vector"), and inserts use $executeRaw with explicit ::vector casting:
export const appendEmbeddings = async ({ videoId, transcript }) => {
const embeddings = await getEmbeddings(transcript.map((t) => t.text));
for (const chunk of newTranscript) {
await prisma.$executeRaw`
INSERT INTO "TranscriptChunk" (
id, "timestampInSeconds", text, embedding, "videoId", "createdAt"
)
VALUES (
gen_random_uuid(),
${
typeof chunk.timestamp === "number"
? chunk.timestamp
: parseInt(chunk.timestamp.toString(), 10)
},
${chunk.text},
${chunk.embedding ? `[${chunk.embedding.join(",")}]` : null}::vector,
${videoId},
NOW()
)
`;
}
};
RAG Search
The search function finds relevant transcript chunks using cosine distance via pgvector's <=> operator. It embeds the user's query, then runs a vector similarity search filtered by the user's videos and optionally by channel or playlist:
export async function searchVideos(
userEmail: string,
queryText: string,
limit: number = 5,
channelHandles?: string[],
playlistIds?: string[],
): Promise<RetrievedChunk[]> {
const [queryEmbedding = []] = await getEmbeddings([queryText]);
const vectorLiteral = `[${queryEmbedding.join(",")}]`;
const query = `
SELECT
tc.text,
1 - (tc.embedding <=> '${vectorLiteral}'::vector) as score,
tc."timestampInSeconds",
tc."videoId",
v."youtubeId"
FROM "TranscriptChunk" tc
JOIN "Video" v ON tc."videoId" = v.id
WHERE ${whereClause}
ORDER BY tc.embedding <=> '${vectorLiteral}'::vector
LIMIT ${limit}
`;
return await prisma.$queryRawUnsafe(query, ...params);
}
The results include the YouTube video ID and timestamp, so the chat response can link directly to the relevant moment in the video.
Chat System
The chat handler retrieves relevant chunks, assembles a structured prompt with XML-tagged context, and generates a response using Gemini. Previous messages from the conversation are included for multi-turn context:
const prompt = `You are a helpful assistant that synthesizes information
from multiple sources...
<context>
<user_message>${query}</user_message>
<relevant_documents>
${chunks
.map(
(chunk) => `
<document>
<relevance>${Math.round(chunk.score * 100)}%</relevance>
<youtube_url>
https://www.youtube.com/watch?v=${chunk.youtubeId}&t=${chunk.timestampInSeconds}s
</youtube_url>
<text>${chunk.text}</text>
</document>`,
)
.join("\n")}
</relevant_documents>
<previous_messages>
${previousMessages
.map(
(msg) => `
<message>
<role>${msg.role}</role>
<content>${msg.message}</content>
</message>`,
)
.join("\n")}
</previous_messages>
</context>`;
Notification System
Transcription is a long-running process, so the system uses event-driven notifications. When a transcription job starts, an Inngest event triggers a notification handler that creates a database record. The UI polls for new notifications so users can navigate away and come back when processing is done.
Difficult Parts
pgvector Raw SQL for Embedding Storage
The biggest friction point was working with pgvector through Prisma. Since Prisma has no native vector type, the schema uses Unsupported("vector") which means you can't use standard Prisma create/update operations for the embedding column. Every embedding insert and query requires raw SQL with $executeRaw or $queryRawUnsafe, and the vector values must be manually formatted as string literals with ::vector casting. This creates a split in the data access layer where most operations use Prisma's type-safe API but embedding operations bypass it entirely.
Batched Embedding Generation with Retry
Processing hundreds of transcript chunks through an embedding API requires batching and error handling. The Gemini embedding API has rate limits, so the system processes batches of 100 with up to 5 concurrent requests and wraps each batch in exponential backoff retry logic. The tricky part is tracking which embeddings correspond to which chunks when processing is parallelized — the system uses positional indexing into a pre-allocated array to place each batch's results at the correct offset.
RAG Prompt Assembly
Building effective RAG prompts required balancing several concerns: including enough retrieved context to answer the question, providing previous conversation history for multi-turn coherence, formatting sources with clickable YouTube timestamps, and instructing the model to cite sources inline. The XML-tagged prompt structure made it easier to separate these concerns and iterate on the prompt without breaking the overall format.
Transcript Chunk Merging
Naive per-segment embedding produces poor retrieval quality because individual transcript segments are often just a few words or a partial sentence. The merging algorithm walks through chunks sequentially, accumulating text until the time span exceeds 10 seconds. This improved retrieval relevance significantly because each embedding now captures a meaningful unit of thought rather than a sentence fragment.
Try Insight Flow or explore the source code.