Async Streaming with LangChain: Performance Optimization Tutorial 2026
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
## Welcoming the Future: Async Streaming with LangChain for Ultimate Performance
Imagine you're chatting with a super-smart robot, and it answers you instantly, word by word, just like a friend. This amazing speed is what we call `async streaming`. When you combine this with LangChain, a powerful tool for building AI apps, you get something truly magical. But even magic can be made faster, and that's what this guide is all about: `async streaming langchain optimization`.
We're going to explore how to make your LangChain applications incredibly fast and responsive, looking ahead to 2026. You'll learn simple yet powerful tricks to get the best performance from your AI assistants. Get ready to dive into the world of speed and efficiency!
### Understanding the Superpowers: Async and Streaming
Before we can optimize, let's make sure we understand the core ideas. We'll start with what "async" means in the world of computers. Then, we'll see why "streaming" is such a big deal for getting fast AI answers.
#### What is 'Async'? A Simple Explanation
Think of your computer as a chef in a kitchen. In the old way, the chef would cook one dish, serve it, then start the next. If boiling water takes 10 minutes, the chef just stands there and waits. This is like normal, or "synchronous," programming.
But with "async" (short for asynchronous), our super chef can start the water boiling, then immediately chop vegetables for another dish. While the veggies are being chopped, the water might boil, and the chef can quickly switch back to it. This way, many tasks move forward at once, making everything much faster. This approach is built on `Python asyncio fundamentals`, allowing your programs to juggle many tasks without getting stuck waiting. It's like having your computer do several things at the same time, giving you a smooth and quick experience.
You don't have to wait for one task to fully complete before starting another. This means your application feels snappier and more responsive to you. It's a cornerstone of modern, high-performance software.
#### Why 'Streaming' is Like Getting Answers Live
Now, let's talk about "streaming." Imagine watching a movie online. You don't have to download the whole movie before you start watching, right? Instead, pieces of the movie come to your screen one by one, letting you watch almost instantly.
Streaming with AI models works the same way. When LangChain talks to a big AI model, instead of waiting for the *entire* answer to be ready, you get the answer back in small chunks. This means you see the first words of the AI's reply very quickly. It feels much faster and more natural, like a real conversation.
This is especially helpful for long answers or complex questions, as you don't experience a long silence. It keeps you engaged and makes the application feel very responsive. The user experience is greatly improved because you get immediate feedback.
#### LangChain and Its Streaming Magic
LangChain is like a superhero toolkit for building applications with large language models (LLMs). It helps you connect different AI tools and services together easily. One of its best features is how well it handles streaming.
When you ask LangChain a question, it can talk to the AI model and start sending you the answer bit by bit, as soon as it gets them. This real-time flow is crucial for interactive apps, making your chatbots feel alive. LangChain provides built-in methods to easily tap into this streaming capability.
For a deeper dive into LangChain's basic usage, you might want to check out our previous post on [Getting Started with LangChain Basics]({{ site.baseurl }}/blog/getting-started-langchain-basics).
### Why We Need to Optimize: Making Fast Even Faster
Even with async and streaming built-in, there's always room for improvement. Think of it like a race car; it's already fast, but engineers always find ways to make it *even* faster. Our goal is to achieve the best `streaming throughput improvement` possible.
We want your LangChain apps to respond almost instantly, no matter how many people are using them or how complex the questions are. This is why `async streaming langchain optimization` is so important. Without careful optimization, your fast streaming app can sometimes feel sluggish.
Poorly optimized async applications can still experience delays or bottlenecks. We need to identify these slow spots and make targeted improvements. Our aim is to ensure your LangChain application always feels super quick.
#### Finding the Slow Spots: Common Bottlenecks
Sometimes, even with async and streaming, things can slow down. It's like a traffic jam on a superhighway. What causes these jams?
1. **Waiting for the AI**: The biggest waiting time is often for the AI model itself to think and generate its next piece of text. We can't make the AI think faster directly, but we can make sure our code isn't adding *extra* waiting time.
2. **Too Much Back and Forth**: If your program sends many tiny messages back and forth, the time it takes to send each message adds up. This creates overhead that slows down the overall process.
3. **Blocking Operations**: Even in an async program, if one part of your code accidentally does something that *forces* it to wait (a "blocking" operation), it can stop everything else. This is a common trap to avoid when dealing with `Python asyncio fundamentals`.
4. **Memory Problems**: If your program uses too much computer memory, it can become slow. This is where `memory management in async` becomes a key consideration.
5. **Inefficient Event Loop**: The "event loop" is like the chef's brain, deciding what to do next. If it's not managed well, tasks can get delayed. This is where `event loop optimization` comes in handy.
By understanding these common problems, we can create strategies to tackle them effectively. We're setting the stage for some serious performance enhancements.
### Core Optimization Strategies: Turbocharging Your LangChain Apps
Now for the fun part: making things super fast! We'll look at several strategies to boost your `async streaming langchain optimization`. These tips will help you get the most out of your applications.
#### 1. Mastering Python Asyncio Fundamentals for Speed
The backbone of all our async magic is Python's `asyncio` library. Understanding how to use it correctly is the first step to unlocking true speed. Let's look at some key parts.
##### The `async` and `await` Super Duo
You've already met `async` and `await`. Remember, `async` tells Python, "Hey, this function might do something that takes a while, so others can work while I'm doing it." And `await` says, "I need to wait for this async thing to finish, but please let other tasks run if they can."
Here's a simple example:
```python
import asyncio
async def cook_soup():
print("Starting soup...")
await asyncio.sleep(3) # Imagine this is boiling water
print("Soup ready!")
async def chop_vegetables():
print("Chopping vegetables...")
await asyncio.sleep(2) # Imagine this is cutting veggies
print("Vegetables chopped!")
async def main():
await cook_soup() # This waits for soup to finish before chopping
await chop_vegetables()
print("--- Sequential Cooking ---")
asyncio.run(main())
async def main_concurrent():
# Now, let's cook and chop at the same time!
await asyncio.gather(cook_soup(), chop_vegetables())
print("\n--- Concurrent Cooking ---")
asyncio.run(main_concurrent())
In the “Sequential Cooking” part, you’ll see “Starting soup…”, then a 3-second wait, then “Soup ready!”, then “Chopping vegetables…”, then a 2-second wait, then “Vegetables chopped!”. The total time is about 5 seconds.
In “Concurrent Cooking,” you’ll see “Starting soup…” and “Chopping vegetables…” almost at the same time. After 2 seconds, “Vegetables chopped!” appears, and after 3 seconds, “Soup ready!” appears. The total time is about 3 seconds, because they ran together! This simple concept is at the heart of Python asyncio fundamentals.
asyncio.gather for Concurrent Streaming
The asyncio.gather function, which you saw in the example above, is incredibly powerful. It lets you run many async functions at the same time and wait for all of them to finish. If you need to make several calls to an AI model or process multiple streams at once, asyncio.gather is your best friend. It orchestrates concurrent streaming perfectly.
Imagine you want to ask three different AI models a question and get their answers simultaneously. Instead of waiting for one model to finish before asking the next, asyncio.gather lets you fire off all requests at once. This drastically reduces the total time spent waiting.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
import asyncio
from langchain_core.runnables import RunnableLambda
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
# We'll use a placeholder for a real LangChain model for simplicity
# In a real app, you'd configure your LLM here.
# For example: model = ChatOpenAI(model="gpt-4", streaming=True)
async def ask_ai_model(model_name: str, question: str):
print(f"[{model_name}] Starting query...")
# Simulate an AI model thinking and streaming
# In a real LangChain app, you'd use model.astream_messages([HumanMessage(question)])
for char in f"Answer from {model_name} for '{question}': This is a streamed response. ":
await asyncio.sleep(0.05) # Simulate latency of streaming char by char
yield char
print(f"[{model_name}] Query finished.")
# Helper to run a generator and collect its output
async def collect_stream(model_name, question):
full_response = ""
async for chunk in ask_ai_model(model_name, question):
full_response += chunk
return full_response
async def main_concurrent_ai_queries():
print("--- Asking Multiple AI Models Concurrently ---")
question = "Tell me about large language models."
# Run multiple AI model calls at the same time
results = await asyncio.gather(
collect_stream("Model A", question + " (short)"),
collect_stream("Model B", question + " (medium)"),
collect_stream("Model C", question + " (long)")
)
for i, res in enumerate(results):
print(f"\nResult from Model {chr(65+i)}:\n{res[:100]}...") # Show first 100 chars
# To run this, you need an asyncio loop
# asyncio.run(main_concurrent_ai_queries())
This example shows how asyncio.gather enables you to run three simulated AI queries at the same time. Instead of waiting for Model A, then Model B, then Model C, they all start and progress together. This dramatically improves streaming throughput improvement when you have multiple independent tasks.
For more on asyncio.gather, check out the official Python documentation (External link).
Giving Others a Turn: asyncio.sleep(0) for Event Loop Optimization
Sometimes, even if you’re doing non-blocking work, your task might be doing a lot of calculations. This can hog the “event loop” (the chef’s brain) and prevent other tasks from running. asyncio.sleep(0) is a little trick that tells the event loop: “Hey, I’m just doing a lot of fast work, but if anyone else needs a turn, let them go now.”
It doesn’t actually make your program wait, but it gracefully yields control. This is a simple but effective technique for event loop optimization. It ensures fairness among all running async tasks, preventing any single task from monopolizing the CPU.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
async def busy_task(name, iterations):
print(f"{name}: Starting...")
for i in range(iterations):
# Simulate some quick work
_ = i * i
if i % 1000 == 0: # Every 1000 iterations, yield control
await asyncio.sleep(0) # Give other tasks a chance to run
print(f"{name}: Finished.")
async def main_sleep_0():
print("--- Demonstrating asyncio.sleep(0) ---")
task1 = asyncio.create_task(busy_task("Task A", 10000))
task2 = asyncio.create_task(busy_task("Task B", 15000))
# Run a third task that prints something every second
async def monitor():
for _ in range(5):
print("Monitor: Still running...")
await asyncio.sleep(1)
task3 = asyncio.create_task(monitor())
await asyncio.gather(task1, task2, task3)
print("All tasks complete.")
# asyncio.run(main_sleep_0())
Without asyncio.sleep(0) in busy_task, the “Monitor” task might not get a chance to print until busy_task is completely done. With asyncio.sleep(0), you’ll see “Monitor: Still running…” messages interspersed with the task progress, showing that the event loop is sharing CPU time more effectively.
2. LangChain Specific Async Patterns
LangChain is built to work beautifully with async. Knowing its specific async LangChain patterns will help you write very efficient code. It provides special methods for streaming.
astream and astream_events: The LangChain Way to Stream
LangChain offers astream and astream_events methods on its runnables (like chains or language models). These are your primary tools for getting streamed output from LangChain components.
astream: This method yields the raw output chunks from the language model or chain. For a chatbot, it would be the words and punctuation coming back.astream_events: This is more advanced. It yields different types of events as the chain processes, like when a tool is called, when a new message starts, or when a message chunk arrives. This is incredibly useful for building complex UIs that need to react to different stages of the AI’s thinking process.
Using these methods ensures you are tapping directly into LangChain’s efficient streaming pipeline. This is a core part of async streaming langchain optimization.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langchain_core.prompts import ChatPromptTemplate
# Make sure you have your OpenAI API key set up as an environment variable
# For example: export OPENAI_API_KEY="sk-..."
async def lang_chain_astream_example():
print("\n--- LangChain astream Example ---")
model = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, streaming=True)
prompt = ChatPromptTemplate.from_messages([
("human", "Tell me a very short story about a brave knight and a dragon.")
])
chain = prompt | model
print("Streaming story:")
async for chunk in chain.astream({"input": "story"}):
print(chunk.content, end="", flush=True)
print("\nStory finished.")
async def lang_chain_astream_events_example():
print("\n--- LangChain astream_events Example ---")
model = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, streaming=True)
prompt = ChatPromptTemplate.from_messages([
("human", "What is the capital of France?")
])
chain = prompt | model
print("Streaming events:")
async for event in chain.astream_events({"input": "capital"}, version="v1"):
kind = event["event"]
if kind == "on_chat_model_stream":
content = event["data"]["chunk"].content
if content:
print(f"Model streamed: '{content}'")
elif kind == "on_chain_end":
print(f"Chain finished with final output: {event['data']['output']}")
else:
print(f"Event: {kind}")
print("\nEvents finished.")
async def main_langchain_streaming():
await lang_chain_astream_example()
await lang_chain_astream_events_example()
# To run this, you'll need an OpenAI API key.
# asyncio.run(main_langchain_streaming())
The astream method gives you the text chunks directly. astream_events provides a richer stream of information, allowing you to see not just the text but also internal events happening within the LangChain process. This level of detail is invaluable for advanced debugging and dynamic UI updates.
Async Callback Handlers for Real-time Feedback
Async callback handlers are super important for getting immediate updates during a long-running LangChain process. Imagine a progress bar or a UI that shows “AI is thinking…” then “Calling tool X…” then “Generating answer…”. Callbacks make this possible.
You can create your own custom callback handler to process chunks of information as they arrive. This is perfect for logging, monitoring, or sending real-time updates to a user interface. This ability to react instantly greatly contributes to streaming throughput improvement by providing timely feedback.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
from langchain_core.callbacks import AsyncCallbackHandler
from langchain_core.messages import HumanMessage
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
import asyncio
class MyStreamingCallbackHandler(AsyncCallbackHandler):
"""Custom async callback handler for streaming."""
def __init__(self, prefix: str = ""):
self.prefix = prefix
self.full_response_content = ""
async def on_llm_new_token(self, token: str, **kwargs) -> None:
"""Runs on new LLM token. Use this to print tokens as they arrive."""
print(f"{self.prefix} Token: '{token}'", end="", flush=True)
self.full_response_content += token
async def on_llm_end(self, response, **kwargs) -> None:
"""Runs at the end of LLM generation."""
print(f"\n{self.prefix} LLM finished. Total content: '{self.full_response_content}'")
self.full_response_content = "" # Reset for next use
async def on_chain_start(
self, serialized: dict, tags: list[str] | None = None, **kwargs
) -> None:
"""Runs when a chain starts."""
print(f"\n{self.prefix} Chain '{serialized.get('name', 'Unknown')}' starting...")
async def on_chain_end(self, outputs: dict, **kwargs) -> None:
"""Runs when a chain ends."""
print(f"{self.prefix} Chain ended.")
async def run_with_custom_callback():
print("\n--- Running with Custom Async Callback Handler ---")
model = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, streaming=True)
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful assistant."),
("human", "Tell me a short poem about a sunny day.")
])
chain = prompt | model
my_callback = MyStreamingCallbackHandler(prefix="[AI RESPONSE] ")
# You can pass callback handlers to the .invoke() or .stream() methods
# For async streaming, use astream or ainvoke with callbacks list
print("Initiating streaming with callback...")
async for chunk in chain.astream({"input": "sunny day poem"}, config={"callbacks": [my_callback]}):
# The callback prints tokens, so we just print a separator here
pass
print("\nStreaming complete with callback.")
# asyncio.run(run_with_custom_callback())
In this example, the MyStreamingCallbackHandler prints each token as it arrives. This gives you immediate insight into what the AI is generating. For more advanced async callback handlers, you could store tokens in a buffer, update a web socket, or trigger other async tasks. You can learn more about AsyncCallbackHandler on the LangChain documentation site (External link).
3. Improving Streaming Throughput
Streaming throughput improvement is all about getting more data processed and delivered to you in the same amount of time. It’s about making the entire pipeline work more efficiently.
Batch vs Stream Performance: When to Choose Which
Sometimes, you have many questions to ask, not just one. Should you ask them one by one (stream), or group them together and ask all at once (batch)? This is the batch vs stream performance question.
- Streaming (one by one): Best for interactive chats or when you need answers right away for each question. Low “time to first token” for each query.
- Batching (many at once): If you have a list of 100 questions and don’t need each answer instantly, but want all 100 answers as fast as possible overall, batching can be much faster. You send all 100 questions, and the AI processes them in a group. This reduces overhead for each individual request.
The best choice depends on your specific use case. For a chat application, streaming is king. For processing a large document with many sub-questions, batching might be more efficient. Combining asyncio.gather with batch processing for independent tasks can provide significant speedups.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate
import time
async def run_stream_queries_sequentially(questions):
print("\n--- Sequential Streaming Queries ---")
model = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, streaming=True)
prompt = ChatPromptTemplate.from_messages([
("human", "{question}")
])
chain = prompt | model
start_time = time.monotonic()
for i, q in enumerate(questions):
print(f"\nQuestion {i+1}: {q}")
print("Response: ", end="")
async for chunk in chain.astream({"question": q}):
print(chunk.content, end="", flush=True)
end_time = time.monotonic()
print(f"\nTotal sequential streaming time: {end_time - start_time:.2f} seconds")
async def run_stream_queries_concurrently(questions):
print("\n--- Concurrent Streaming Queries (using astream and gather) ---")
model = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, streaming=True)
prompt = ChatPromptTemplate.from_messages([
("human", "{question}")
])
chain = prompt | model
async def get_streamed_response(q_idx, question):
full_response = ""
# print(f"Starting concurrent stream for Question {q_idx+1}: {question}")
async for chunk in chain.astream({"question": question}):
full_response += chunk.content
# print(f"Finished concurrent stream for Question {q_idx+1}")
return f"Response for Q{q_idx+1}: {full_response[:100]}..."
start_time = time.monotonic()
# Create a list of async streaming tasks
tasks = [get_streamed_response(i, q) for i, q in enumerate(questions)]
# Run them all concurrently
responses = await asyncio.gather(*tasks)
end_time = time.monotonic()
for res in responses:
print(res)
print(f"\nTotal concurrent streaming time: {end_time - start_time:.2f} seconds")
async def run_batch_queries_concurrently(questions):
print("\n--- Batch Queries (using ainvoke and gather) ---")
model = ChatOpenAI(model="gpt-3.5-turbo", temperature=0) # Streaming usually not needed for batch
prompt = ChatPromptTemplate.from_messages([
("human", "{question}")
])
chain = prompt | model
async def get_batch_response(q_idx, question):
response = await chain.ainvoke({"question": question})
return f"Batch Response for Q{q_idx+1}: {response.content[:100]}..."
start_time = time.monotonic()
# Create a list of async invoke tasks
tasks = [get_batch_response(i, q) for i, q in enumerate(questions)]
# Run them all concurrently
responses = await asyncio.gather(*tasks)
end_time = time.monotonic()
for res in responses:
print(res)
print(f"\nTotal batch query time: {end_time - start_time:.2f} seconds")
async def main_batch_vs_stream():
test_questions = [
"What is the capital of Canada?",
"Who invented the telephone?",
"Explain photosynthesis in one sentence.",
"What is the largest ocean on Earth?",
"Name two famous scientists.",
]
await run_stream_queries_sequentially(test_questions)
await run_stream_queries_concurrently(test_questions)
await run_batch_queries_concurrently(test_questions)
# asyncio.run(main_batch_vs_stream())
You’ll notice that for multiple independent questions, run_stream_queries_concurrently and run_batch_queries_concurrently are significantly faster than run_stream_queries_sequentially. The batching approach might be slightly faster overall if you don’t need intermediate streaming results, due to fewer network round trips or optimized API calls. This comparison is key to understanding batch vs stream performance.
4. Memory Management in Async
Computers have memory (RAM) where they keep data they are currently using. If your program uses too much memory, it can slow down or even crash. Memory management in async is about making sure your streaming applications are efficient with memory.
When you stream, data comes in chunks. Instead of storing all the chunks in a huge list or variable, try to process each chunk and then discard it if you don’t need to keep the whole history.
For example, if you’re just showing the AI’s response on a screen, you don’t need to save every single word in memory after it’s been displayed. You just need the current word. If you do need the full response later, concatenate the chunks as they arrive, but be mindful of how large the total response might become.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langchain_core.prompts import ChatPromptTemplate
import sys
async def stream_and_manage_memory():
print("\n--- Memory Management in Async Streaming Example ---")
model = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, streaming=True)
prompt = ChatPromptTemplate.from_messages([
("human", "Write a very long story about a space explorer discovering a new planet. Make it at least 500 words.")
])
chain = prompt | model
# Strategy 1: Store entire response in memory (can be bad for very long responses)
print("\nStrategy 1: Storing full response in a single string (less memory efficient for very long streams):")
full_response_string = ""
start_memory_string = sys.getsizeof(full_response_string)
async for chunk in chain.astream({"input": "long story"}):
full_response_string += chunk.content
# In a real app, you might update a UI here
# print(chunk.content, end="", flush=True) # Commented to avoid excessive output
end_memory_string = sys.getsizeof(full_response_string)
print(f"Full response length: {len(full_response_string)} characters.")
print(f"Memory used by string (bytes): {end_memory_string - start_memory_string}")
# print(f"Full response (first 200 chars): {full_response_string[:200]}...")
# Strategy 2: Process chunks and discard, or use a limited buffer
print("\nStrategy 2: Processing chunks without holding the full string (more memory efficient):")
# In this strategy, we don't store the full response in 'full_response_list'
# Instead, we process and potentially discard each chunk.
# We will simulate processing by simply printing.
chunk_count = 0
total_chars_processed = 0
# If we only need to display, we don't need to accumulate in memory
# If we need the full response at the end, a list of strings is often better than repeated string concatenation.
full_response_list = [] # Better for accumulating large responses
start_memory_list = sys.getsizeof(full_response_list)
async for chunk in chain.astream({"input": "long story"}):
content = chunk.content
if content:
# Simulate displaying or processing the chunk immediately
# print(f"Chunk received (len={len(content)}): '{content}'")
total_chars_processed += len(content)
full_response_list.append(content) # Accumulate efficiently
chunk_count += 1
# Here, we only keep the current chunk in memory for processing,
# not the entire growing string that gets copied repeatedly.
end_memory_list = sys.getsizeof(full_response_list)
print(f"Total chunks processed: {chunk_count}")
print(f"Total characters processed: {total_chars_processed}")
print(f"Memory used by list of chunks (bytes): {end_memory_list - start_memory_list}")
# print(f"Full response (first 200 chars from list): {''.join(full_response_list)[:200]}...")
# asyncio.run(stream_and_manage_memory())
Notice the difference in how sys.getsizeof() reports memory usage. Repeatedly appending to a string (full_response_string += chunk.content) can create many temporary string copies, leading to higher transient memory usage. Appending to a list and then joining (full_response_list.append(content)) is generally more memory-efficient for building a large string incrementally. This mindful approach is crucial for memory management in async, especially with large language model responses.
5. Profiling Async Streaming: Finding the Bottlenecks
Even with all these tips, sometimes your application might still feel slow. This is where profiling async streaming comes in. It’s like having a special detective tool that tells you exactly which parts of your code are taking the most time.
Python has built-in tools like cProfile (for synchronous code, but can give hints for async parts) and asyncio’s debug mode. For more detailed async profiling, external tools or carefully placed time.monotonic() calls are often needed.
Simple Profiling with time.monotonic()
The easiest way to start profiling is to measure how long different sections of your code take. Use time.monotonic() to get high-resolution timestamps.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
import asyncio
import time
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage
from langchain_core.prompts import ChatPromptTemplate
async def profile_a_stream_call():
print("\n--- Profiling a Single LangChain Stream Call ---")
model = ChatOpenAI(model="gpt-3.5-turbo", temperature=0, streaming=True)
prompt = ChatPromptTemplate.from_messages([
("human", "Write a short, engaging description of a new smartphone. Focus on its camera features.")
])
chain = prompt | model
start_full_process = time.monotonic()
# Measure time to first token
first_token_time = None
stream_start_time = time.monotonic()
response_content = ""
print("Streaming response:")
async for chunk in chain.astream({"input": "smartphone description"}):
if first_token_time is None:
first_token_time = time.monotonic()
print(chunk.content, end="", flush=True)
response_content += chunk.content
stream_end_time = time.monotonic()
print("\n") # Newline after streamed content
end_full_process = time.monotonic()
print(f"Total characters received: {len(response_content)}")
if first_token_time:
print(f"Time to first token: {first_token_time - stream_start_time:.4f} seconds")
print(f"Total stream duration (from first request to last chunk): {stream_end_time - stream_start_time:.4f} seconds")
print(f"Total function execution time (including setup/teardown): {end_full_process - start_full_process:.4f} seconds")
# asyncio.run(profile_a_stream_call())
This simple example helps you see how long it takes to get the first piece of information (time to first token) and the total time to get the whole stream. This is critical for assessing streaming throughput improvement.
asyncio Debug Mode
You can run your asyncio event loop in debug mode, which provides warnings about slow operations and unawaited tasks. This can be very helpful for catching subtle performance issues.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
import asyncio
import time
async def slow_blocking_task():
print("Slow task started (blocking)...")
time.sleep(1) # This blocks the event loop!
print("Slow task finished (blocking).")
async def fast_async_task():
print("Fast task started (async)...")
await asyncio.sleep(0.1) # This yields to event loop
print("Fast task finished (async).")
async def main_debug_mode():
print("\n--- Running in asyncio Debug Mode ---")
# This task will block and generate a warning in debug mode
task1 = asyncio.create_task(slow_blocking_task())
task2 = asyncio.create_task(fast_async_task())
await asyncio.gather(task1, task2)
if __name__ == "__main__":
# To enable debug mode, you usually set the environment variable
# or pass debug=True to asyncio.run()
# E.g., in your terminal: PYTHONASYNCIODEBUG=1 python your_script.py
# Or programmatically:
# asyncio.run(main_debug_mode(), debug=True)
print("To see debug warnings, run this script with 'asyncio.run(main_debug_mode(), debug=True)'")
print("Or set PYTHONASYNCIODEBUG=1 in your environment.")
When run with debug=True, asyncio will print warnings about slow_blocking_task because time.sleep(1) is a blocking call that prevents the event loop from running other tasks. This kind of warning is invaluable for profiling async streaming and identifying synchronous code that’s accidentally blocking your async pipeline.
6. Async Best Practices
To keep your async LangChain applications robust and performant, follow these general async best practices:
- Always
awaityourasyncfunctions: Forgetting toawaitan async function means it might not run at all, or it runs in the background without you waiting for its result, leading to bugs. - Avoid blocking calls in
asyncfunctions: Never usetime.sleep()or other synchronous I/O operations (likerequests.get()) directly inside anasyncfunction. Use theirasyncioequivalents (await asyncio.sleep(),await aiohttp.get()) or run them in an executor (await loop.run_in_executor(...)). - Use
asyncio.create_task()for background work: If you want to start an async task and continue doing other things without waiting for it, useasyncio.create_task(). Just remember to eventuallyawaitthe task or ensure it’s handled, perhaps in aasyncio.gather. - Handle exceptions: Just like in regular Python, use
try...exceptblocks to gracefully handle errors in your async tasks. Unhandled exceptions in async tasks can silently crash your application. - Leverage
asyncio.TimeoutError: For network requests or operations that might hang, useasyncio.wait_for()to set a timeout. This prevents your program from waiting forever.
By adhering to these principles, you ensure that your async streaming langchain optimization efforts lead to stable and high-performing applications. You can read more about general Python asyncio best practices in the official documentation (External link).
Putting It All Together: A Comprehensive Example (2026 Style)
Let’s combine several of these optimization techniques into a single, more advanced example. Imagine you’re building an AI assistant that can answer multiple complex questions quickly, providing real-time feedback.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
import asyncio
import time
from langchain_openai import ChatOpenAI
from langchain_core.messages import HumanMessage, SystemMessage
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.callbacks import AsyncCallbackHandler
import random # For simulating varying task times
# --- Custom Async Callback Handler ---
class EnhancedStreamingCallbackHandler(AsyncCallbackHandler):
def __init__(self, task_id: int):
self.task_id = task_id
self.full_response_content = ""
self.start_time = time.monotonic()
print(f"Task {self.task_id}: Handler initialized.")
async def on_llm_start(self, serialized: dict, prompts: list[str], **kwargs) -> None:
print(f"Task {self.task_id}: LLM started processing prompt: '{prompts[0][:50]}...'")
async def on_llm_new_token(self, token: str, **kwargs) -> None:
# Simulate some quick processing or UI update for each token
# await asyncio.sleep(0.001) # Small sleep to simulate work without blocking
self.full_response_content += token
# In a real UI, you'd update a specific element for this task_id
# print(f"Task {self.task_id} received token: '{token}'") # Too verbose for full run
async def on_llm_end(self, response, **kwargs) -> None:
duration = time.monotonic() - self.start_time
print(f"Task {self.task_id}: LLM finished in {duration:.2f}s. Total chars: {len(self.full_response_content)}")
# print(f"Task {self.task_id} Final Response: {self.full_response_content[:100]}...") # Too verbose
self.full_response_content = "" # Reset
async def on_chain_start(
self, serialized: dict, tags: list[str] | None = None, **kwargs
) -> None:
print(f"Task {self.task_id}: Chain '{serialized.get('name', 'Unknown')}' starting...")
async def on_chain_end(self, outputs: dict, **kwargs) -> None:
print(f"Task {self.task_id}: Chain ended.")
# --- Async LangChain Runnable Function ---
async def get_ai_stream_response(task_id: int, question: str, model: ChatOpenAI):
print(f"Task {task_id}: Initiating AI query for: '{question[:30]}...'")
prompt = ChatPromptTemplate.from_messages([
("system", "You are a helpful and concise AI assistant. Respond briefly."),
("human", "{question}")
])
chain = prompt | model
# Use our custom async callback handler for real-time feedback
callback_handler = EnhancedStreamingCallbackHandler(task_id)
full_response_chunks = []
try:
async for chunk in chain.astream(
{"question": question},
config={"callbacks": [callback_handler], "tags": [f"task:{task_id}"]}
):
full_response_chunks.append(chunk.content)
# You could also add asyncio.sleep(0) here if the processing
# within the loop (e.g., UI updates) is computation-heavy
# await asyncio.sleep(0) # Yield control if necessary
except Exception as e:
print(f"Task {task_id}: An error occurred during streaming: {e}")
return f"Error for {question}: {e}"
final_response = "".join(full_response_chunks)
print(f"Task {task_id}: Full stream collected. Content length: {len(final_response)}")
return f"Task {task_id} Result: {final_response[:150]}..."
# --- Main Orchestrator for Concurrent Streaming ---
async def main_optimized_streaming_app():
print("--- Starting Optimized Async Streaming Application (2026 Edition) ---")
# 1. Initialize models (consider different models for different tasks if needed)
# Using a single model here for simplicity, but you could have multiple.
llm = ChatOpenAI(model="gpt-3.5-turbo", temperature=0.7, streaming=True, verbose=False)
# 2. Prepare multiple questions/tasks for concurrent processing
questions_to_ask = [
"Explain quantum computing in simple terms.",
"Write a short, optimistic quote about the future of AI.",
"List three benefits of renewable energy.",
"What is the capital of Australia and why was it chosen?",
"Describe a typical day for a deep-sea explorer.",
"Summarize the plot of 'Moby Dick' in 50 words.",
]
# 3. Use asyncio.gather for concurrent execution of streaming tasks
print(f"\nLaunching {len(questions_to_ask)} concurrent streaming tasks...")
overall_start_time = time.monotonic()
tasks = []
for i, q in enumerate(questions_to_ask):
# Simulate varying delays before starting some tasks
# await asyncio.sleep(random.uniform(0.1, 0.5))
task_coroutine = get_ai_stream_response(i + 1, q, llm)
tasks.append(task_coroutine)
# Gather all results. This will run the `get_ai_stream_response` for each question concurrently.
all_results = await asyncio.gather(*tasks)
overall_end_time = time.monotonic()
print("\n--- All Concurrent Streaming Tasks Completed ---")
for result in all_results:
print(result)
print(f"\nTotal execution time for {len(questions_to_ask)} tasks: {overall_end_time - overall_start_time:.2f} seconds")
print("\nThis demonstrates `async streaming langchain optimization` through concurrency and callbacks!")
if __name__ == "__main__":
# Ensure you have your OpenAI API key set as an environment variable (OPENAI_API_KEY)
# To run this code, uncomment the line below and ensure you have LangChain and OpenAI libraries installed:
# pip install langchain langchain-openai langchain-core
asyncio.run(main_optimized_streaming_app())
# For profiling, you might run with debug mode:
# asyncio.run(main_optimized_streaming_app(), debug=True)
In this comprehensive example, you can see several async streaming langchain optimization elements working together:
Python asyncio fundamentals:asyncandawaitare used throughout, andasyncio.gatherorchestrates the concurrent execution of multiple streaming tasks.async LangChain patterns: Thechain.astream()method is used to get streamed output from the LLM.async callback handlers:EnhancedStreamingCallbackHandlerprovides real-time updates and logs about each task’s progress, showing thestreaming throughput improvementby giving instant feedback.Concurrent streaming: Multiple questions are processed at the same time, significantly reducing the total time compared to processing them one after another.Event loop optimization: While not explicitly shown withasyncio.sleep(0), the non-blocking nature ofastreamand properawaitcalls ensures the event loop remains responsive.- Implicit
memory management in async: By processing chunks in the callback and only accumulating the full response once, we avoid excessive temporary memory allocations.
This setup is a powerful demonstration of how to build performant and responsive AI applications in 2026.
Looking Ahead to 2026: The Future of Async Streaming
The world of AI and async programming is always moving fast. What might async streaming langchain optimization look like in 2026?
- Smarter Event Loops: Python’s
asynciowill likely become even more intelligent, automatically optimizing how tasks are run. We might see built-in tools that makeevent loop optimizationeasier. - Built-in Batching: LangChain itself might offer even more sophisticated ways to handle
batch vs stream performanceautomatically, deciding the best approach for different scenarios. - Advanced Profiling Tools: Expect better, more integrated tools for
profiling async streamingthat give you clearer insights with less effort. These tools could even suggest optimizations. - Edge AI and Local Streaming: As AI models get smaller, we might see more
concurrent streaminghappening directly on your device, leading to even faster responses without relying on cloud services. - Cross-language Async: While this tutorial focuses on Python, the principles of async streaming are universal. We might see more seamless integration and optimization across different programming languages.
Staying updated with these trends will ensure your async streaming langchain optimization efforts remain cutting-edge.
Conclusion: Your Path to Super-Fast AI
You’ve now learned the secrets to building incredibly fast and responsive AI applications using async streaming with LangChain. From understanding the core Python asyncio fundamentals to implementing advanced async LangChain patterns and using async callback handlers, you have a powerful toolkit.
We’ve covered how to achieve streaming throughput improvement by choosing between batch vs stream performance, managing memory management in async, and how to play detective with profiling async streaming. These async best practices will serve you well in all your projects.
The future of AI is fast, interactive, and intelligent. By mastering these async streaming langchain optimization techniques, you’re not just ready for 2026; you’re building the future, today. Keep experimenting, keep learning, and keep making your AI applications faster and better for everyone!
```
Leave a comment