The Streams API allows you to stream data from your tasks to the outside world in realtime using the metadata system. This is particularly useful for streaming LLM outputs or any other real-time data.
How streams work
Streams use the metadata system to send data chunks in real-time. You register a stream with a specific key using metadata.stream
, and then consumers can subscribe to receive those chunks as they’re emitted.
Basic streaming example
Here’s how to stream data from OpenAI in your task:
import { task, metadata } from "@trigger.dev/sdk";
import OpenAI from "openai";
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
});
export type STREAMS = {
openai: OpenAI.ChatCompletionChunk; // The type of the chunk is determined by the provider
};
export const myTask = task({
id: "my-task",
run: async (payload: { prompt: string }) => {
const completion = await openai.chat.completions.create({
messages: [{ role: "user", content: payload.prompt }],
model: "gpt-3.5-turbo",
stream: true,
});
// Register the stream with the key "openai"
// This will "tee" the stream and send it to the metadata system
const stream = await metadata.stream("openai", completion);
let text = "";
// You can read the returned stream as an async iterator
for await (const chunk of stream) {
logger.log("Received chunk", { chunk });
// The type of the chunk is determined by the provider
text += chunk.choices.map((choice) => choice.delta?.content).join("");
}
return { text };
},
});
Subscribing to streams from backend
You can subscribe to the stream using the runs.subscribeToRun
method with .withStreams()
:
import { runs } from "@trigger.dev/sdk";
import type { myTask, STREAMS } from "./trigger/my-task";
// Somewhere in your backend
async function subscribeToStream(runId: string) {
// Use a for-await loop to subscribe to the stream
for await (const part of runs.subscribeToRun<typeof myTask>(runId).withStreams<STREAMS>()) {
switch (part.type) {
case "run": {
console.log("Received run", part.run);
break;
}
case "openai": {
// part.chunk is of type OpenAI.ChatCompletionChunk
console.log("Received OpenAI chunk", part.chunk);
break;
}
}
}
}
Multiple streams
You can register and subscribe to multiple streams in the same task:
import { task, metadata } from "@trigger.dev/sdk";
import OpenAI from "openai";
const openai = new OpenAI({
apiKey: process.env.OPENAI_API_KEY,
});
export type STREAMS = {
openai: OpenAI.ChatCompletionChunk;
fetch: string; // The response body will be an array of strings
};
export const myTask = task({
id: "my-task",
run: async (payload: { prompt: string }) => {
const completion = await openai.chat.completions.create({
messages: [{ role: "user", content: payload.prompt }],
model: "gpt-3.5-turbo",
stream: true,
});
// Register the OpenAI stream
await metadata.stream("openai", completion);
const response = await fetch("https://jsonplaceholder.typicode.com/posts");
if (!response.body) {
return;
}
// Register a fetch response stream
// Pipe the response.body through a TextDecoderStream to convert it to a string
await metadata.stream("fetch", response.body.pipeThrough(new TextDecoderStream()));
},
});
You may notice above that we aren’t consuming either of the streams in the task. In the
background, we’ll wait until all streams are consumed before the task is considered complete (with
a max timeout of 60 seconds). If you have a longer running stream, make sure to consume it in the
task.
Then subscribe to both streams:
import { runs } from "@trigger.dev/sdk";
import type { myTask, STREAMS } from "./trigger/my-task";
// Somewhere in your backend
async function subscribeToStream(runId: string) {
for await (const part of runs.subscribeToRun<typeof myTask>(runId).withStreams<STREAMS>()) {
switch (part.type) {
case "run": {
console.log("Received run", part.run);
break;
}
case "openai": {
// part.chunk is of type OpenAI.ChatCompletionChunk
console.log("Received OpenAI chunk", part.chunk);
break;
}
case "fetch": {
// part.chunk is a string
console.log("Received fetch chunk", part.chunk);
break;
}
}
}
}
Using with the AI SDK
The AI SDK provides a higher-level API for working with AI models. You can use it with the Streams API:
import { openai } from "@ai-sdk/openai";
import { logger, metadata, runs, schemaTask } from "@trigger.dev/sdk";
import { streamText } from "ai";
import { z } from "zod";
export type STREAMS = {
openai: string;
};
export const aiStreaming = schemaTask({
id: "ai-streaming",
description: "Stream data from the AI sdk",
schema: z.object({
model: z.string().default("o1-preview"),
prompt: z.string().default("Hello, how are you?"),
}),
run: async ({ model, prompt }) => {
logger.info("Running OpenAI model", { model, prompt });
const result = streamText({
model: openai(model),
prompt,
});
// pass the textStream to the metadata system
const stream = await metadata.stream("openai", result.textStream);
let text = "";
for await (const chunk of stream) {
logger.log("Received chunk", { chunk });
text += chunk; // chunk is a string
}
return { text };
},
});
When using tools with the AI SDK, you can access tool calls and results using the fullStream
:
import { openai } from "@ai-sdk/openai";
import { logger, metadata, runs, schemaTask } from "@trigger.dev/sdk";
import { streamText, tool, type TextStreamPart } from "ai";
import { z } from "zod";
const tools = {
getWeather: tool({
description: "Get the weather in a location",
parameters: z.object({
location: z.string().describe("The location to get the weather for"),
}),
execute: async ({ location }) => ({
location,
temperature: 72 + Math.floor(Math.random() * 21) - 10,
}),
}),
};
export type STREAMS = {
// Give the stream a type of TextStreamPart along with the tools
openai: TextStreamPart<{ getWeather: typeof tools.getWeather }>;
};
export const aiStreamingWithTools = schemaTask({
id: "ai-streaming-with-tools",
description: "Stream data from the AI SDK and use tools",
schema: z.object({
model: z.string().default("gpt-4o-mini"),
prompt: z
.string()
.default(
"Based on the temperature, will I need to wear extra clothes today in San Francisco? Please be detailed."
),
}),
run: async ({ model, prompt }) => {
logger.info("Running OpenAI model", { model, prompt });
const result = streamText({
model: openai(model),
prompt,
tools, // Pass in the tools to use
maxSteps: 5, // Allow streamText to repeatedly call the model
});
// pass the fullStream to the metadata system
const stream = await metadata.stream("openai", result.fullStream);
let text = "";
for await (const chunk of stream) {
logger.log("Received chunk", { chunk });
// chunk is a TextStreamPart
if (chunk.type === "text-delta") {
text += chunk.textDelta;
}
}
return { text };
},
});
You can define a Trigger.dev task that can be used as a tool, and will automatically be invoked with triggerAndWait
when the tool is called:
import { openai } from "@ai-sdk/openai";
import { logger, metadata, runs, schemaTask, toolTask } from "@trigger.dev/sdk";
import { streamText, tool, type TextStreamPart } from "ai";
import { z } from "zod";
export const getWeather = toolTask({
id: "get-weather",
description: "Get the weather for a location",
// Define the parameters for the tool, which becomes the task payload
parameters: z.object({
location: z.string(),
}),
run: async ({ location }) => {
// return mock data
return {
location,
temperature: 72 + Math.floor(Math.random() * 21) - 10,
};
},
});
export type STREAMS = {
// Give the stream a type of TextStreamPart along with the tools
openai: TextStreamPart<{ getWeather: typeof getWeather.tool }>;
};
export const aiStreamingWithTools = schemaTask({
id: "ai-streaming-with-tools",
description: "Stream data from the AI SDK and use tools",
schema: z.object({
model: z.string().default("gpt-4o-mini"),
prompt: z
.string()
.default(
"Based on the temperature, will I need to wear extra clothes today in San Francisco? Please be detailed."
),
}),
run: async ({ model, prompt }) => {
logger.info("Running OpenAI model", { model, prompt });
const result = streamText({
model: openai(model),
prompt,
tools: {
getWeather: getWeather.tool, // pass weatherTask.tool as a tool
},
maxSteps: 5, // Allow streamText to repeatedly call the model
});
// pass the fullStream to the metadata system
const stream = await metadata.stream("openai", result.fullStream);
let text = "";
for await (const chunk of stream) {
logger.log("Received chunk", { chunk });
// chunk is a TextStreamPart
if (chunk.type === "text-delta") {
text += chunk.textDelta;
}
}
return { text };
},
});