Streaming Results

There are multiple (simple) steps you need to take in order to stream your Pipeline results.

Define the Pipeline as Streaming

Define the pipeline as streaming by adding the stream property to the pipeline definition.

index.ts

const jokeGptPipelineStream = aigur.pipeline
.create<{ subject:string }, ReadableStream>({
id: 'jokegptStream',
stream: true,
flow: (flow) =>
flow
.node(replaceString, ({ input }) => ({
text: input.subject,
modifier: 'tell me a joke about $(text)$',
}))
.node(gpt3PredictionStream, ({ prev, input, nodes }) => ({
prompt: prev.text,
}))
.output(({ prev }) => prev.stream),
});

Define the Output as a ReadableStream

The output of a streaming pipeline needs to be ReadableStream.

index.ts

const jokeGptPipelineStream = aigur.pipeline
.create<{ subject:string }, ReadableStream>({
id: 'jokegptStream',
stream: true,
flow: (flow) =>
flow
.node(replaceString, ({ input }) => ({
text: input.subject,
modifier: 'tell me a joke about $(text)$',
}))
.node(gpt3PredictionStream, ({ prev, input, nodes }) => ({
prompt: prev.text,
}))
.output(({ prev }) => prev.stream),
});

Use Dedicated Stream Nodes

Stream Nodes' are unique in that their output is a stream instead of an object. For streaming GPT3 results, use the gpt3PredictionStream node instead of the gpt3Prediction node.

index.ts

const jokeGptPipelineStream = aigur.pipeline
.create<{ subject:string }, ReadableStream>({
id: 'jokegptStream',
stream: true,
flow: (flow) =>
flow
.node(replaceString, ({ input }) => ({
text: input.subject,
modifier: 'tell me a joke about $(text)$',
}))
.node(gpt3PredictionStream, ({ prev, input, nodes }) => ({
prompt: prev.text,
}))
.output(({ prev }) => prev.stream),
});

Return the Stream from the Pipeline

The pipeline's output node should return the stream that was created in the flow.

index.ts

const jokeGptPipelineStream = aigur.pipeline
.create<{ subject:string }, ReadableStream>({
id: 'jokegptStream',
stream: true,
flow: (flow) =>
flow
.node(replaceString, ({ input }) => ({
text: input.subject,
modifier: 'tell me a joke about $(text)$',
}))
.node(gpt3PredictionStream, ({ prev, input, nodes }) => ({
prompt: prev.text,
}))
.output(({ prev }) => prev.stream),
});

Invoking the Pipeline

To invoke a streaming Pipeline on the server from the client you need to call the invokeStream function. It works in a similar way to invokeRemote but since when using streams you don't receive a single value as an output, rather a stream of values the signature is slightly different - invokeStream accepts a callback that will be called whenever a new value is emitted from the stream.

index.ts

let text = '';
jokePipeline.invokeStream('/api/jokePipeline',
{
subject: 'cats',
},
(newResult) => {
text += newResult;
},
);