33 minute read

LangChain Streaming Response Tutorial: Handle Errors and Timeouts Like a Pro

LangChain Streaming Response Tutorial: Handle Errors and Timeouts Like a Pro

Imagine you’re building a smart app that talks to big language models, like ChatGPT. Sometimes, these models give you answers very quickly, character by character. This quick way of getting answers is called “streaming.” It makes your app feel much faster and more responsive.

But what happens if something goes wrong during this streaming? Maybe your internet blinks, or the language model server gets busy. Your app might freeze, or show a confusing error message. This is why learning to handle errors and timeouts is super important.

This guide will show you how to make your LangChain streaming applications strong and reliable. We’ll learn how to catch problems before they break your app. You’ll discover how to handle common streaming errors gracefully.

You’ll also learn how to configure timeouts correctly. This ensures your app doesn’t wait forever for a response that might never come. By the end, you’ll be able to build LangChain apps that deal with unexpected issues like a pro.

Understanding LangChain Streaming

LangChain helps you build powerful apps using large language models. When you ask a language model a question, it can give you the full answer all at once. Or, it can “stream” the answer back to you piece by piece. Streaming is like watching a video load; you see parts of it as they arrive.

For many apps, streaming is much better. You don’t have to wait for the whole answer to appear at once. Instead, you see words pop up on your screen almost instantly. LangChain makes this streaming process easy to set up.

LangChain uses special helpers called “callbacks” for streaming. These callbacks are like little helpers that watch the streaming process. They can do things each time a new piece of the answer arrives. We will use these callbacks a lot for our error handling.

Common Streaming Errors You Might Face

Even the best apps can run into problems. When you’re streaming data, many things can go wrong. Knowing these common streaming errors helps you prepare for them. Let’s look at some of the most frequent issues you might encounter.

Network Issues

Your app and the language model need a good internet connection. If this connection isn’t stable, problems can arise. The internet can be tricky, sometimes it works perfectly, sometimes it’s a bit wobbly.

Connection Dropped: Imagine you’re on a video call, and suddenly your internet cuts out. The call stops working. The same thing can happen with your LangChain streaming app. The connection between your app and the language model server might just vanish. This is a common streaming error.

When the connection drops, your app stops receiving data. It might be left waiting indefinitely or show an incomplete response. We need ways to detect this quickly. This helps us tell the user what happened.

Slow Internet: Sometimes the internet doesn’t drop completely, but it gets very slow. Data trickles in at a snail’s pace. This can make your app feel very sluggish. It might seem like nothing is happening for a long time.

While not a complete error, slow internet can lead to timeouts. Your app might give up waiting if data arrives too slowly. You want your app to be responsive, even if the connection isn’t perfect.

Server Problems

The language model server itself can also have issues. It’s a powerful computer, but even powerful computers can get overwhelmed. These problems are often out of your control. However, you can make your app react gracefully to them.

API Rate Limits: Think of a popular store with a line to get in. If too many people try to enter at once, the store might say, “Please wait, we’re at capacity.” Language models do something similar with “rate limits.” They only allow a certain number of requests in a specific time frame.

If your app sends too many requests too quickly, the server will tell you to slow down. This is often an error message like “Too Many Requests” (HTTP 429). It means you’ve hit an API rate limit. Your LangChain streaming error handling needs to know how to manage this.

Internal Server Errors (500s): This is like when a store says, “Sorry, we’re having technical difficulties right now.” The server is experiencing an unexpected problem. It might be a bug in their code or a temporary overload.

These errors usually show up as “Internal Server Error” or HTTP 500 status codes. They mean the server itself failed to process your request. You should treat these errors as temporary and potentially retry your request later.

Model Unavailable: Sometimes, the specific language model you’re trying to use might be down for maintenance. Or it might be temporarily overloaded and unable to respond. This is another type of server problem. The server might send a specific error code or a generic 500 error.

In these cases, your LangChain streaming app cannot get any response. It’s important to catch this and inform the user. You might suggest trying a different model or waiting a bit.

Timeout Problems

Waiting forever for something that never comes is frustrating. This is where timeouts become incredibly important. Timeout configuration is a key part of robust error handling.

Response Takes Too Long: Imagine ordering food, and the restaurant says it’ll be ready in 15 minutes. If an hour passes and you still don’t have your food, you’d probably give up. Your app works similarly. If the language model takes too long to start sending data or to finish its response, your app should stop waiting.

Without proper timeout configuration, your app might hang forever. This makes your app unusable and can waste resources. We need to tell our app, “If you don’t hear back in X seconds, give up.”

How LangChain Handles Basic Timeouts: Some language model integrations in LangChain have built-in timeout settings. You can often tell the client how long to wait for a connection or for the first byte of data. We’ll explore how to use these settings later on. They provide a basic layer of protection.

Partial Responses

Sometimes, a streaming response doesn’t finish completely. It stops midway. This is a common streaming error that needs special attention.

What Happens if Streaming Stops Midway? Imagine reading a story, and suddenly the book ends in the middle of a sentence. It’s confusing and unsatisfying. The same can happen with streaming. The connection might drop, or a server error might occur after some data has already been sent.

Your app will have an incomplete, or partial response. It’s crucial to acknowledge this. Simply displaying an unfinished sentence isn’t good user experience. This requires careful partial response recovery.

How to Deal with Incomplete Data: When you get a partial response, you have a few choices. You can display what you have and clearly mark it as incomplete. You could also try to regenerate the entire response from scratch. We’ll look at how to gather these partial responses using callbacks. This allows us to make informed decisions about how to display them.

Setting Up Your LangChain Stream for Error Handling

Now that we know what can go wrong, let’s learn how to prepare our LangChain app. We’ll start with the simplest way to catch errors. Then, we’ll move to more advanced techniques using LangChain’s powerful callback system.

The Basics: try-except

The most fundamental way to catch errors in Python is using try-except blocks. This is like putting a safety net around your code. If something breaks inside the try block, the except block will catch it. This prevents your whole program from crashing.

1
2
3
4
5
6
7
try:
    # Code that might cause an error
    response = llm.invoke("What is the capital of France?")
    print(response)
except Exception as e:
    # Code to run if an error occurs
    print(f"Oops, something went wrong: {e}")

This is a good start for general errors that stop the entire operation. However, for streaming, errors can happen at different points. They might even happen after some data has already been received. This is where LangChain’s callbacks become very useful.

Using LangChain Callbacks for Finer Control

LangChain callbacks are special functions that run at certain times. They are like event listeners. They can listen for when the language model starts, when it sends a chunk of data, or when it finishes. Crucially, they can also listen for errors. This is how we achieve precise LangChain streaming error handling.

What are Callbacks? Think of callbacks as reporters on a news desk. They get notified when something important happens. In LangChain, these reporters are BaseCallbackHandler classes. You can create your own custom callback handler. This allows you to define what happens during streaming events.

LangChain provides several methods within these callback handlers. These methods get called for different types of events. For error handling, a few are particularly important:

  • on_llm_error: Called when the LLM (Large Language Model) itself encounters an error.
  • on_tool_error: Called if a tool used by the LLM (like a search tool) has an error.
  • on_chain_error: Called if an error happens within a LangChain “chain” or agent.

AsyncIteratorCallbackHandler and BaseCallbackHandler: For streaming, you’ll often use AsyncIteratorCallbackHandler if your application is asynchronous. This handler collects chunks of data as they arrive. The base BaseCallbackHandler is more general and can be used for both sync and async operations. You will inherit from these to create your custom error handling logic.

Let’s look at an example of a custom callback. This callback will specifically catch errors and print a friendly message. This is a practical example related to langchain streaming error handling.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.outputs import LLMResult
from typing import Any, Dict, List, Union

class StreamingErrorCallback(BaseCallbackHandler):
    """Callback handler for streaming errors."""

    def __init__(self):
        self.error_occurred = False
        self.partial_response = ""

    def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        """Runs on new LLM token. Useful for streaming."""
        if not self.error_occurred:
            self.partial_response += token
            # You might want to display this token to the user immediately
            # print(token, end="", flush=True)

    def on_llm_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any) -> None:
        """Runs when LLM errors."""
        self.error_occurred = True
        print("\n--- LLM Error Detected ---")
        print(f"Error type: {type(error).__name__}")
        print(f"Error message: {error}")
        if self.partial_response:
            print(f"Partial response received before error: {self.partial_response[:100]}...")
        print("Please try again or contact support if the issue persists.")

    def on_chain_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any) -> None:
        """Runs when chain errors."""
        self.error_occurred = True
        print("\n--- Chain Error Detected ---")
        print(f"Error type: {type(error).__name__}")
        print(f"Error message: {error}")
        print("An issue occurred within the processing chain.")

    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        """Runs when LLM ends."""
        if not self.error_occurred:
            print("\n--- Streaming Completed Successfully ---")
        else:
            print("\n--- Streaming Interrupted Due to Error ---")
            
# How to use this callback (example setup):
# from langchain_openai import ChatOpenAI
# from langchain.prompts import ChatPromptTemplate
#
# model = ChatOpenAI(streaming=True, callbacks=[StreamingErrorCallback()])
# prompt = ChatPromptTemplate.from_messages([("human", "{question}")])
# chain = prompt | model
#
# try:
#     # Simulate an error by asking a question that might trigger an LLM-side issue
#     # Or by manually introducing an error during the chain execution
#     for chunk in chain.stream({"question": "Write a very long story about a duck who learns to fly, but cut off the internet connection halfway through the response."}):
#         # In a real app, you would be sending these chunks to the frontend
#         pass # The callback handles printing
# except Exception as e:
#     print(f"Caught top-level exception: {e}")

In this StreamingErrorCallback, on_llm_new_token collects the streaming output. If an error occurs, on_llm_error or on_chain_error will be called. It then sets error_occurred to True. This custom callback is a great way to improve your LangChain streaming error handling. It allows for graceful error messages and even helps with partial response recovery.

Handling Timeouts Like a Pro

Timeouts are crucial for responsive applications. You don’t want your app waiting forever for a response that might never arrive. Proper timeout configuration ensures your application remains quick and available. Let’s explore how to implement timeouts effectively.

Built-in Timeout Mechanisms

Many language model integrations within LangChain offer their own timeout settings. These are often the easiest to configure. They tell the underlying API client how long to wait.

LLM-specific Timeouts: For example, when using ChatOpenAI or OpenAI, you can pass a timeout parameter. This parameter controls how long the client will wait for the API call to complete. This includes both connection time and the time to receive the first byte of data.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# Example: Setting a timeout for ChatOpenAI
# If the response doesn't start or complete within 5 seconds, it will raise an error.
model_with_timeout = ChatOpenAI(temperature=0, timeout=5) # Timeout in seconds
prompt = ChatPromptTemplate.from_messages([("human", "{question}")])
parser = StrOutputParser()
chain = prompt | model_with_timeout | parser

try:
    for chunk in chain.stream({"question": "Tell me a very long story about a brave knight and a dragon, but make it take longer than 5 seconds to start."}):
        print(chunk, end="", flush=True)
except Exception as e:
    print(f"\nError: {e}")
    print("The LLM response timed out!")

This timeout parameter is a simple yet powerful way to prevent your app from hanging. It’s a key part of your timeout configuration. Make sure to check the documentation for the specific LLM you are using. Different models might have different timeout options.

External Timeout Wrappers

Sometimes, the built-in LLM timeouts might not be enough. Or you might want to apply a timeout to a broader section of your code. In such cases, you can use external Python features.

Using asyncio.wait_for (for async): If your LangChain application is asynchronous (using async and await), asyncio.wait_for is your best friend. It lets you run any awaitable (like an async function) with a maximum time limit. If the awaitable doesn’t finish in time, it raises an asyncio.TimeoutError.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
import asyncio
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

async def get_streaming_response_with_timeout(question: str, timeout_seconds: int = 10):
    model = ChatOpenAI(temperature=0, streaming=True)
    prompt = ChatPromptTemplate.from_messages([("human", "{question}")])
    parser = StrOutputParser()
    chain = prompt | model | parser

    full_response = ""
    try:
        # Wrap the streaming call in asyncio.wait_for
        async for chunk in asyncio.wait_for(chain.astream({"question": question}), timeout=timeout_seconds):
            print(chunk, end="", flush=True)
            full_response += chunk
        return full_response
    except asyncio.TimeoutError:
        print(f"\nError: The streaming response timed out after {timeout_seconds} seconds.")
        return full_response + "\n[Response interrupted due to timeout]"
    except Exception as e:
        print(f"\nAn unexpected error occurred during streaming: {e}")
        return full_response + "\n[Response interrupted due to error]"

# To run this async function:
# import nest_asyncio
# nest_asyncio.apply() # Needed for running asyncio from Jupyter/IPython if already an event loop is running

# You would call it like this in an async context:
# await get_streaming_response_with_timeout("Tell me a story about a frog who became king.", timeout_seconds=5)
# If you are testing a timeout, make the story prompt very long, or use a short timeout.
# For example, try timeout_seconds=1 for a longer story.

This snippet shows powerful timeout configuration. It ensures that the entire streaming process, from start to finish, adheres to your time limit. This is vital for managing long-running or stalled requests.

Using signal module (for sync, with limitations): For synchronous code, Python’s signal module can sometimes be used to implement timeouts. However, this method is generally more complex and has significant limitations. It primarily works on Unix-like systems and can only interrupt the main thread. This makes it less flexible and often not suitable for complex applications or cross-platform use. We recommend asyncio.wait_for for async code or LLM-specific timeouts where available. For more details on using signal for timeouts, you might refer to specific Python documentation, but be aware of its constraints.

Robust Strategies for Connection Drops and Partial Responses

Dealing with unexpected disconnections and incomplete data is crucial. It directly impacts the user experience. You want your app to be resilient and informative, even when things go wrong. These strategies help with connection drop handling and partial response recovery.

Detecting Disconnection

Knowing when a connection drops is the first step. If the streaming simply stops without a clear error, it can be tricky. You need mechanisms to infer a disconnection.

on_llm_end not being called: In our custom StreamingErrorCallback, we have an on_llm_end method. This method should always be called when the LLM finishes its response successfully. If an error occurs or the connection drops before the LLM sends its full response, on_llm_end might not be called.

This absence is a strong signal that something went wrong. You can use a flag, like self.error_occurred, to track if an error was explicitly caught. If on_llm_end is called and self.error_occurred is False, then the streaming completed normally. If it’s not called, or called after self.error_occurred became True, you know there was an issue. This helps in connection drop handling.

Partial Response Recovery

When a stream is interrupted, you might have some data. It’s often better to show this partial data than nothing at all. This is where partial response recovery comes in.

Storing partial data: Our StreamingErrorCallback already includes self.partial_response. Each time on_llm_new_token is called, it appends the new token to this string.

1
2
3
4
5
6
7
8
9
10
11
12
13
# From StreamingErrorCallback class
class StreamingErrorCallback(BaseCallbackHandler):
    # ... (other methods) ...
    def __init__(self):
        self.error_occurred = False
        self.partial_response = "" # This stores the data

    def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        """Runs on new LLM token. Useful for streaming."""
        if not self.error_occurred: # Only store if no error reported yet
            self.partial_response += token
            # In a real application, you might update a UI element here
            # For example, send `token` to a frontend WebSocket

This simple mechanism allows you to collect all the data that successfully arrived. Even if an error happens later, you still have the partial_response. This is a vital part of partial response recovery.

Displaying “Response interrupted” messages: Once you detect an interruption, inform the user clearly. Don’t just show an incomplete sentence. You can combine the partial_response with an informative message.

1
2
3
4
5
6
7
8
9
10
# Inside your main application logic, after streaming
if callback.error_occurred: # Assuming 'callback' is your StreamingErrorCallback instance
    print("\n--- Response Interrupted ---")
    if callback.partial_response:
        print("Here is what we got so far:")
        print(callback.partial_response)
    print("We encountered an issue getting the full response. Please try again.")
else:
    print("\n--- Full Response Received ---")
    print(callback.partial_response)

This user-friendly approach transforms a potentially confusing error into a clear communication. It’s better for your users to know that an error occurred. They understand that the response is incomplete. This is a form of graceful error messages.

Implementing Retry Mechanisms

Sometimes, errors are just temporary. The server might be busy for a moment, or there might be a small network glitch. In these cases, simply trying again a few seconds later can solve the problem. This is where retry mechanisms are incredibly useful.

Why Retry?

Transient errors: Many errors are “transient.” This means they are not permanent. They go away if you wait a short while and try again. API rate limits, temporary server overloads, or brief network hiccups are examples of transient errors. For these, a retry mechanism is perfect.

If you don’t retry, your app might fail for a temporary problem. This is frustrating for users. Retrying makes your application more resilient. It improves the chances of success without user intervention.

Simple Retries

You can implement a basic retry logic with a for loop and time.sleep(). This attempts the operation a fixed number of times.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
import time
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

def get_response_with_retries(question: str, max_retries: int = 3):
    model = ChatOpenAI(temperature=0)
    prompt = ChatPromptTemplate.from_messages([("human", "{question}")])
    parser = StrOutputParser()
    chain = prompt | model | parser

    for attempt in range(max_retries):
        try:
            response = chain.invoke({"question": question})
            print(f"Attempt {attempt + 1}: Success!")
            return response
        except Exception as e:
            print(f"Attempt {attempt + 1}: Failed with error: {e}")
            if attempt < max_retries - 1:
                print("Retrying in 2 seconds...")
                time.sleep(2) # Wait for a bit before retrying
            else:
                print("Max retries reached. Operation failed permanently.")
                raise # Re-raise the last exception if all retries fail

# Example usage:
# try:
#     final_answer = get_response_with_retries("Explain quantum entanglement simply.", max_retries=5)
#     print(f"\nFinal answer: {final_answer}")
# except Exception as e:
#     print(f"The operation ultimately failed: {e}")

This basic retry mechanism is a good start. However, it always waits for the same amount of time. This isn’t always the best strategy.

Retry with Exponential Backoff

A smarter way to retry is using “exponential backoff.” (LSI: retry with exponential backoff) This means waiting a bit longer each time you retry. It’s like gently knocking on a door, then waiting a bit longer before knocking again. If still no answer, you wait even longer the next time.

Why it’s better:

  • Prevents overwhelming the server: If the server is truly overloaded, retrying too quickly just makes it worse. Exponential backoff gives the server time to recover.
  • Reduces network congestion: Fewer rapid retries mean less unnecessary network traffic.
  • More efficient: You don’t waste time making requests that are likely to fail.

Libraries like tenacity make implementing retry with exponential backoff very easy. It uses decorators, which are special functions that wrap around your existing code.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
from tenacity import retry, wait_exponential, stop_after_attempt, Retrying
import time
import random
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# Assume a function that might fail
def might_fail_stream(question: str):
    # Simulate a 30% chance of failure for demonstration
    if random.random() < 0.3:
        raise ConnectionError("Simulated network issue or API error!")
    
    # In a real scenario, this would be your LangChain streaming call
    model = ChatOpenAI(temperature=0, streaming=True)
    prompt = ChatPromptTemplate.from_messages([("human", "{question}")])
    parser = StrOutputParser()
    chain = prompt | model | parser

    full_response = ""
    for chunk in chain.stream({"question": question}):
        print(chunk, end="", flush=True)
        full_response += chunk
    return full_response

@retry(wait=wait_exponential(multiplier=1, min=4, max=10), stop=stop_after_attempt(5))
def robust_streaming_call(question: str):
    """
    Attempts to stream a response, retrying with exponential backoff on failure.
    """
    print(f"\nAttempting streaming for: '{question}'...")
    return might_fail_stream(question)

# Example Usage:
# try:
#     print("Starting robust streaming call with tenacity...")
#     final_result = robust_streaming_call("Explain large language models simply.")
#     print(f"\nFinal result after potential retries: {final_result}")
# except Exception as e:
#     print(f"\nOperation failed after multiple retries: {e}")

In this example, @retry is the magic part. wait_exponential tells tenacity to wait longer each time. min=4, max=10 means it will wait at least 4 seconds, but no more than 10 seconds, before the next retry. stop_after_attempt(5) means it will try a maximum of 5 times.

For a deeper dive into retries with exponential backoff, you can read our other blog post on Deep Dive into Retries with Exponential Backoff.

Advanced Error Handling Patterns

Beyond basic retries and timeouts, some patterns can make your streaming applications even more robust. These are particularly useful for high-traffic applications or when dealing with unreliable external services.

Circuit Breaker Pattern

(LSI: circuit breaker patterns) Imagine an electrical circuit. If there’s an overload, a circuit breaker trips. This stops the current to prevent damage. In software, a circuit breaker pattern does something similar. It stops your application from continuously trying to access a failing service.

What it is and why it’s useful: If a language model API is constantly returning errors, retrying immediately can make things worse. It adds more load to an already struggling service. A circuit breaker monitors failures. If failures reach a certain threshold, it “opens” the circuit. This means it stops sending requests to the failing service for a while.

After a timeout period, it tries a single request to see if the service has recovered. If it succeeds, the circuit “closes,” and requests can flow again. If it fails, the circuit stays open. This prevents your application from hammering a broken service. It saves resources and provides faster feedback to your users.

Preventing overwhelming a failing service: The circuit breaker ensures that if the LLM provider is having a major outage, your app doesn’t contribute to the problem. It gracefully steps back. Instead of failing on every request, it fails fast once the circuit is open.

How to implement it simply (pybreaker library): Libraries like pybreaker make implementing circuit breakers straightforward in Python.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from pybreaker import CircuitBreaker, CircuitBreBreakerError
import random
import time
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser

# Create a circuit breaker that trips after 3 failures
# and stays open for 5 seconds
llm_breaker = CircuitBreaker(fail_max=3, reset_timeout=5)

# Wrap your LLM call with the circuit breaker
@llm_breaker
def call_llm_with_breaker(question: str):
    # Simulate a flaky LLM service: 70% chance of success, 30% failure
    if random.random() < 0.3:
        raise ConnectionError("Simulated LLM service unavailable!")
    
    model = ChatOpenAI(temperature=0, streaming=True)
    prompt = ChatPromptTemplate.from_messages([("human", "{question}")])
    parser = StrOutputParser()
    chain = prompt | model | parser

    full_response = ""
    for chunk in chain.stream({"question": question}):
        # print(chunk, end="", flush=True) # Uncomment to see streaming
        full_response += chunk
    return full_response

# Example Usage:
# print("--- Circuit Breaker Demo ---")
# for i in range(10):
#     try:
#         print(f"Request {i+1}: Trying to call LLM...")
#         result = call_llm_with_breaker(f"What is the capital of planet {i+1}?")
#         # print(f"\nResult: {result[:50]}...")
#         print(f"Request {i+1}: SUCCESS")
#     except CircuitBreakerError:
#         print(f"Request {i+1}: Circuit is OPEN! Not attempting call. Skipping...")
#         time.sleep(1) # Wait a bit before trying again
#     except ConnectionError as e:
#         print(f"Request {i+1}: FAILED with {e}. Breaker's failure count increased.")
#     except Exception as e:
#         print(f"Request {i+1}: Unexpected error: {e}")
#     time.sleep(0.5) # Small delay between requests
#
# print("\n--- Circuit Breaker Demo Finished ---")

This demo shows how a circuit breaker can protect your application. When the LLM service consistently fails, the circuit breaker opens. It then quickly rejects subsequent requests. This is very useful for graceful error messages and overall stability.

To learn more about implementing circuit breakers in Python, you can check out our post on Implementing Circuit Breakers in Python.

Idempotency

Idempotency is a fancy word for a simple concept. It means that performing an operation multiple times has the same effect as performing it once. For streaming, this is about ensuring that if you retry a request, you don’t create duplicate or incorrect data.

What it means for streaming operations: If your LangChain app sends a request to “generate a story” and the connection drops, you might retry. If the underlying LLM call is not idempotent, retrying could start a new story or save a duplicate. You want to ensure that retrying a request for a streaming response correctly resumes the previous one or replaces it cleanly.

Ensuring an operation can be retried without side effects: For streaming LLM responses, strict idempotency is often handled by the LLM provider. They might use request IDs to ensure that if the same request is sent again, they return the same answer. On your end, you should:

  • Cleanly handle partial data: If you retry, clear out the old partial response before starting anew.
  • Use unique request IDs: If the LLM API supports it, pass a unique ID with each request. This can help the service identify retries.
  • Be careful with side effects: If your streaming app saves parts of the response to a database, ensure that retries don’t create duplicate entries.

User Experience and Graceful Error Messages

Even with the best error handling, sometimes things will still go wrong. How you communicate these issues to your users is vital. Graceful error messages can turn a frustrating experience into an understandable one. This is key for user-friendly error display.

Clear Feedback

When an error occurs, users need to know what happened. Vague messages are unhelpful. Be specific and actionable if possible.

“Something went wrong, please try again.”: This is a good default message for general, unspecific errors. It tells the user there was a problem and suggests a simple solution.

“Response took too long.”: When a timeout occurs, explicitly state it. This explains why the full response wasn’t received. It’s much clearer than a blank screen.

“Connection interrupted.”: If you detect a network disconnection, tell the user. This helps them understand that the issue might be with their internet. They might check their network connection.

Displaying Partial Responses

As discussed earlier, showing what you have is better than nothing. Combine your partial response with an explanatory message.

1
2
3
Your story so far: "Once upon a time, in a land far away, lived a brave little..."

[Response interrupted due to network error. Please try generating the story again.]

This approach helps the user see that progress was made. It also clearly indicates that the response is incomplete. This is a great example of user-friendly error display and partial response recovery in action.

Logging for Debugging

(LSI: logging streaming failures) While user-friendly messages are for the end-user, logging is for you, the developer. Good logging is essential for understanding and fixing problems. When streaming failures happen, detailed logs are invaluable.

Importance of structured logging: Instead of just printing messages, use Python’s logging module. It allows you to:

  • Set log levels (DEBUG, INFO, WARNING, ERROR, CRITICAL).
  • Direct logs to files, databases, or external monitoring systems.
  • Include structured data (e.g., JSON) for easier analysis.

What to log:

  • Error type: The specific class of the error (e.g., TimeoutError, ConnectionRefusedError).
  • Timestamp: When the error occurred.
  • User ID/Session ID: If applicable, to link errors to specific user sessions.
  • Prompt/Request ID: What question was being asked when the error happened.
  • Partial response: Any data received before the error. This is crucial for debugging.
  • Stack trace: The full call stack of where the error originated.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
import logging

logging.basicConfig(level=logging.ERROR, format='%(asctime)s - %(levelname)s - %(message)s')

class LoggingStreamingErrorCallback(StreamingErrorCallback):
    """Extends StreamingErrorCallback to include logging."""

    def on_llm_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any) -> None:
        super().on_llm_error(error, **kwargs) # Call parent's error handling
        
        # Log the error details for debugging
        logging.error(
            "LLM Streaming Error occurred",
            exc_info=True, # This adds the stack trace
            extra={
                "error_type": type(error).__name__,
                "error_message": str(error),
                "partial_response_len": len(self.partial_response),
                "partial_response_snippet": self.partial_response[:100] # Log a snippet
                # Add more context like user_id, session_id, prompt_text if available
            }
        )

# When creating your LLM, use this new callback
# model = ChatOpenAI(streaming=True, callbacks=[LoggingStreamingErrorCallback()])

This snippet shows how to enhance your callback for logging streaming failures. It provides rich context for every error. This makes troubleshooting much easier.

Putting It All Together: A Comprehensive Example

Let’s combine several of these ideas into a more comprehensive example. We’ll create an asynchronous function that handles streaming with timeouts, retries, and custom error logging. This illustrates powerful langchain streaming error handling.

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
import asyncio
import logging
from tenacity import retry, wait_exponential, stop_after_attempt, retry_if_exception_type
from langchain_openai import ChatOpenAI
from langchain_core.prompts import ChatPromptTemplate
from langchain_core.output_parsers import StrOutputParser
from langchain_core.callbacks import BaseCallbackHandler
from langchain_core.outputs import LLMResult
from typing import Any, Dict, List, Union

# Configure basic logging
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')

class AdvancedStreamingErrorCallback(BaseCallbackHandler):
    """
    A comprehensive callback handler for streaming,
    collecting tokens, and logging detailed errors.
    """
    def __init__(self):
        self.full_response_content = ""
        self.error_occurred = False
        self.error_details = None

    def on_llm_new_token(self, token: str, **kwargs: Any) -> None:
        """Collects new tokens for the response."""
        if not self.error_occurred:
            self.full_response_content += token
            # In a real application, you'd send this to the client (e.g., WebSocket)
            print(token, end="", flush=True)

    def on_llm_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any) -> None:
        """Handles LLM-specific errors, logging them and setting flags."""
        self.error_occurred = True
        self.error_details = error
        logging.error(
            f"LLM Streaming Error: {type(error).__name__} - {error}",
            exc_info=True,
            extra={
                "error_type": type(error).__name__,
                "error_message": str(error),
                "partial_response_len": len(self.full_response_content),
                "partial_response_snippet": self.full_response_content[:100]
            }
        )
        print(f"\n--- Streaming Error: {type(error).__name__} ---")
        if self.full_response_content:
            print(f"Partial response received: {self.full_response_content[:200]}...")
        print("Please try again later. (Error logged for analysis)")

    def on_chain_error(self, error: Union[Exception, KeyboardInterrupt], **kwargs: Any) -> None:
        """Handles Chain-specific errors."""
        self.error_occurred = True
        self.error_details = error
        logging.error(
            f"Chain Streaming Error: {type(error).__name__} - {error}",
            exc_info=True,
            extra={
                "error_type": type(error).__name__,
                "error_message": str(error),
                "partial_response_len": len(self.full_response_content),
                "partial_response_snippet": self.full_response_content[:100]
            }
        )
        print(f"\n--- Chain Error: {type(error).__name__} ---")
        if self.full_response_content:
            print(f"Partial response received: {self.full_response_content[:200]}...")
        print("An internal process failed. (Error logged)")

    def on_llm_end(self, response: LLMResult, **kwargs: Any) -> None:
        """Finalizes the streaming process."""
        if not self.error_occurred:
            print("\n--- Streaming completed successfully ---")
        else:
            print("\n--- Streaming terminated due to error ---")

# Define exceptions that we want to retry on
RETRYABLE_EXCEPTIONS = (
    asyncio.TimeoutError,
    ConnectionError,
    # Add specific LLM API errors if known, e.g., openai.APIStatusError
)

@retry(
    wait=wait_exponential(multiplier=1, min=2, max=10), # Exponential backoff: 2, 4, 8 seconds...
    stop=stop_after_attempt(4),                      # Try up to 4 times
    retry=retry_if_exception_type(RETRYABLE_EXCEPTIONS), # Only retry these specific errors
    reraise=True # Re-raise the last exception if all retries fail
)
async def robust_langchain_stream(question: str, llm_timeout: int = 20):
    """
    Performs a LangChain streaming call with robust error handling,
    including timeouts, retries, and custom callbacks.
    """
    logging.info(f"Starting robust streaming for: '{question}' (LLM timeout: {llm_timeout}s)")
    callback_handler = AdvancedStreamingErrorCallback()
    
    # Configure LLM with streaming and the custom callback, plus a direct timeout
    llm = ChatOpenAI(temperature=0.7, streaming=True, callbacks=[callback_handler], timeout=llm_timeout)
    prompt = ChatPromptTemplate.from_messages([("human", "{question}")])
    parser = StrOutputParser()
    chain = prompt | llm | parser

    try:
        # Wrap the async streaming call with an overall timeout using asyncio.wait_for
        # This catches timeouts even if the LLM's internal timeout doesn't fire for some reason
        await asyncio.wait_for(
            _run_streaming_chain(chain, question),
            timeout=llm_timeout + 5 # Give a little extra buffer beyond LLM's internal timeout
        )
        
        if callback_handler.error_occurred:
            raise callback_handler.error_details # Re-raise the caught error for retry logic
        return callback_handler.full_response_content
    except asyncio.TimeoutError:
        callback_handler.error_occurred = True
        timeout_msg = f"Overall streaming operation timed out after {llm_timeout + 5} seconds."
        logging.error(timeout_msg)
        print(f"\n--- {timeout_msg} ---")
        if callback_handler.full_response_content:
            print(f"Partial response: {callback_handler.full_response_content[:200]}...")
        raise asyncio.TimeoutError(timeout_msg) # Re-raise for tenacity
    except Exception as e:
        # Any other unexpected error that wasn't caught by the callback or tenacity
        logging.error(f"Unexpected error in robust_langchain_stream: {e}", exc_info=True)
        print(f"\n--- Unexpected Error: {type(e).__name__} ---")
        print("An unforeseen issue occurred. Please check logs.")
        raise # Re-raise for tenacity

async def _run_streaming_chain(chain, question: str):
    """Internal helper to run the streaming chain."""
    async for _ in chain.astream({"question": question}):
        # Tokens are handled by the callback_handler.on_llm_new_token
        pass

# Example of how to run this in an async context
# async def main():
#     print("--- Starting comprehensive streaming demo ---")
#     try:
#         # This question is intentionally long to test timeouts/retries
#         final_response = await robust_langchain_stream(
#             "Write a comprehensive essay discussing the ethical implications of artificial intelligence in creative arts, including arguments for and against its use, and potential future scenarios. Make it very long.",
#             llm_timeout=10 # Set a short timeout to trigger errors for testing
#         )
#         print("\n--- Final Full Response (if successful) ---")
#         print(final_response)
#     except Exception as e:
#         print(f"\n--- Robust streaming failed after all retries: {e} ---")
#     print("\n--- Demo Finished ---")

# import nest_asyncio
# nest_asyncio.apply()
# asyncio.run(main())

This comprehensive snippet demonstrates how to leverage:

  • Custom Callback: AdvancedStreamingErrorCallback for collecting partial responses and logging detailed error information.
  • LLM Timeout: Passed directly to ChatOpenAI.
  • Overall Timeout: Using asyncio.wait_for to ensure the entire operation completes within a defined time.
  • Retry with Exponential Backoff: The @retry decorator from tenacity handles retrying specific types of transient errors.
  • Graceful User Messages: Printed messages inform the user about success, partial responses, or failures.
  • Detailed Logging: Errors are logged with logging.error including stack traces and context.

This combination provides powerful langchain streaming error handling. It creates a highly resilient streaming application.

Best Practices for LangChain Streaming Error Handling

Building robust applications means being prepared for failures. By following these best practices, you can make your LangChain streaming apps reliable and user-friendly.

  • Always use try-except: Wrap your main streaming calls in try-except blocks. This catches unexpected errors that might slip through other mechanisms. It prevents your entire application from crashing.
  • Leverage LangChain callbacks: Design custom BaseCallbackHandler classes. Use on_llm_error, on_chain_error, and on_tool_error to catch and manage errors at specific points. These callbacks are essential for fine-grained control and partial response recovery.
  • Implement timeouts: Configure timeouts at multiple levels. Use the timeout parameter for your LLM client (e.g., ChatOpenAI). For asynchronous code, asyncio.wait_for provides an excellent way to enforce overall operation timeouts. Timeout configuration is a cornerstone of responsive apps.
  • Consider retries for transient errors: Use libraries like tenacity to implement retry with exponential backoff. This automatically handles temporary network issues or API rate limits. It makes your app much more resilient to intermittent problems.
  • Provide clear user feedback: When an error occurs, give your users actionable and understandable messages. Inform them about connection drops, timeouts, or general issues. Showing partial responses with a clear “interrupted” message is a good example of user-friendly error display.
  • Log everything: Implement structured logging for all streaming failures. Include error types, messages, timestamps, and any partial data received. This is crucial for logging streaming failures and debugging problems in production.
  • Test your error paths: Don’t just test the “happy path.” Actively simulate network issues, slow responses, and server errors. This ensures your error handling logic works as expected. A robust application is one that has been thoroughly tested under adverse conditions.

Conclusion

You’ve now learned how to handle errors and timeouts in your LangChain streaming applications like a true professional. We covered common streaming errors, from network issues to server problems. You saw how vital timeout configuration is for responsive apps. We explored basic try-except blocks and delved into the power of LangChain callbacks for precise LangChain streaming error handling.

You also discovered advanced techniques like retry with exponential backoff and the circuit breaker pattern. These strategies help your app recover from transient issues and prevent overwhelming failing services. Remember the importance of graceful error messages and detailed logging for debugging.

By applying these principles, you’ll build LangChain applications that are not just smart, but also incredibly resilient. Your users will appreciate an app that provides clear feedback, even when things go wrong. Go forth and build robust streaming applications!

Leave a comment