Custom Document Loaders in LangChain: Advanced Tutorial with Code
Custom Document Loaders in LangChain: Advanced Tutorial with Code
LangChain is a powerful toolkit for building applications with large language models. It helps you connect different pieces, like talking to an AI model or fetching information from your own data. A very important part of LangChain is how it gets your data ready for the AI.
This process often involves something called a “document loader.” These loaders take your files, like PDFs or text documents, and turn them into a format LangChain can understand. While LangChain offers many built-in loaders, sometimes your data is unique.
That’s where custom document loaders langchain come in handy. They let you tell LangChain exactly how to read and prepare your special files or information sources. This guide will show you how to build your own, step by step.
You will learn how to handle different data types and make your applications smarter. This tutorial is for you if you want to unlock LangChain’s full potential with your own data.
Understanding LangChain Document Loaders
Before we dive into building custom loaders, let’s understand what a “Document” means in LangChain. In LangChain, a Document is a simple container. It holds a piece of text content and some extra information about that text.
This extra information is called “metadata.” Metadata can be anything useful, like the source file name or a creation date. Think of a Document as a page from a book, with the page’s text as page_content and the book’s title or chapter number as metadata.
Document loaders are the tools that create these Document objects from your raw data. They read from a file, a database, or even a website. Then, they package the content and its details into LangChain Document objects.
The Role of the BaseLoader Class
Every document loader in LangChain starts from a special blueprint called BaseLoader. This is a Python class that provides the basic structure for all loaders. It sets the rules for how a loader should behave.
Understanding this BaseLoader class overview is key to implementing custom loaders. You won’t usually use BaseLoader directly, but you will build your own loader by extending it. Think of it like a general recipe for baking, where you then add your specific ingredients to make a cake or cookies.
By inheriting from BaseLoader, your custom loader automatically gets some expected behaviors. This ensures it works smoothly with other parts of LangChain. It makes sure your loader fits right into the LangChain ecosystem.
The Anatomy of a BaseLoader: load() and lazy_load()
When you create a custom document loaders langchain, you need to implement at least one important method. These methods tell LangChain how to get the data. The two main methods are load() and lazy_load().
Let’s explore what each one does. They both return a list of Document objects, but they do it in different ways. Knowing the difference helps you choose the right one for your needs.
The load() Method
The load() method is the most straightforward way to get documents. When you call load(), it fetches all the data at once. It processes everything and returns a complete list of Document objects.
Imagine you have a small folder of text files. The load() method would open all files, read their content, and return a list of Document objects for every single file. This is great for smaller datasets or when you need all documents available immediately.
However, for very large datasets, loading everything at once can use a lot of memory. It might also make your program slow to start. This is why the lazy_load() method exists.
Here’s a simple example of a custom loader using load(). This loader will create documents from a simple Python list.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
from langchain_core.documents import Document
from langchain_core.document_loaders import BaseLoader
class SimpleListLoader(BaseLoader):
"""A custom document loader for a list of strings."""
def __init__(self, texts: list[str]):
"""
Initialize the loader with a list of text strings.
Args:
texts: A list of strings, where each string will become a Document's page_content.
"""
self.texts = texts
def load(self) -> list[Document]:
"""
Load documents from the initialized list of texts.
Returns:
A list of Document objects.
"""
documents = []
for i, text in enumerate(self.texts):
# Create a Document for each text, adding a simple source metadata
doc = Document(page_content=text, metadata={"source": f"list_item_{i}"})
documents.append(doc)
return documents
# Let's test our simple custom document loader
if __name__ == "__main__":
my_data = [
"The first paragraph of my data.",
"Here is the second paragraph, full of useful information.",
"And finally, the third part. This concludes the simple example."
]
# Create an instance of our custom loader
loader = SimpleListLoader(my_data)
# Load the documents
docs = loader.load()
# Print the loaded documents
print(f"Loaded {len(docs)} documents.")
for doc in docs:
print(f"Content: '{doc.page_content}'")
print(f"Metadata: {doc.metadata}")
print("-" * 20)
In this code, you can see how SimpleListLoader inherits from BaseLoader. It has an __init__ method to set up the data it will load. The load() method then goes through each item, creates a Document, and gathers them all into a list.
The lazy_load() Method
The lazy_load() method is different because it uses a Python feature called a “generator.” Instead of creating all documents at once, lazy_load() creates and yields documents one by one. It only creates a document when you ask for it.
Think of it like a conveyor belt in a factory. load() waits until all products are finished and then sends them in one big box. lazy_load() sends each product down the conveyor belt as soon as it’s ready. This means your program can start working with the first document even before all documents are processed.
This method is super useful for very large files or data streams. It saves memory and can make your application feel faster. This is a key part of lazy_load vs load methods decision-making.
Here’s how you might implement lazy_load() for our SimpleListLoader:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
from langchain_core.documents import Document
from langchain_core.document_loaders import BaseLoader
from typing import Iterator
class SimpleListLazyLoader(BaseLoader):
"""A custom document loader for a list of strings, using lazy_load."""
def __init__(self, texts: list[str]):
"""
Initialize the loader with a list of text strings.
Args:
texts: A list of strings, where each string will become a Document's page_content.
"""
self.texts = texts
def lazy_load(self) -> Iterator[Document]:
"""
Lazily load documents from the initialized list of texts.
Yields:
Document objects one by one.
"""
for i, text in enumerate(self.texts):
# Create a Document for each text, adding a simple source metadata
doc = Document(page_content=text, metadata={"source": f"list_item_{i}_lazy"})
yield doc # The 'yield' keyword makes this a generator
# Let's test our simple custom document loader with lazy_load
if __name__ == "__main__":
my_data_lazy = [
"This is the first lazy loaded document content.",
"The second lazy document is here, ready to be processed.",
"And the final lazy document is now available."
]
# Create an instance of our custom lazy loader
lazy_loader = SimpleListLazyLoader(my_data_lazy)
print("Starting lazy loading...")
# Iterate through the documents, they are loaded one by one
for i, doc in enumerate(lazy_loader.lazy_load()):
print(f"Processing lazy document {i+1}:")
print(f" Content: '{doc.page_content}'")
print(f" Metadata: {doc.metadata}")
print("-" * 20)
print("Finished lazy loading.")
You can see the main difference is the yield keyword. This turns the method into a generator. When you iterate over lazy_loader.lazy_load(), it produces documents one by one, saving memory and allowing for immediate processing.
When to Use Which: lazy_load vs load methods
The choice between load() and lazy_load() depends on your specific situation.
- Use
load()when:- Your dataset is small.
- You need all documents in memory at once (e.g., for batch processing).
- The overhead of creating a generator is not necessary.
- Use
lazy_load()when:- Your dataset is very large, and loading it all at once would consume too much memory.
- You want to start processing documents as soon as they are available, without waiting for everything.
- You are dealing with streaming data or infinite sources.
For most custom document loaders langchain implementations, lazy_load() is often preferred. It’s more memory-efficient and flexible. If you implement lazy_load(), LangChain will automatically provide a basic load() implementation that collects all documents from lazy_load() into a list. However, you can also implement both if you need a specific load() behavior that is different from just collecting lazy_load() results.
Designing Your Document Schema
The way you structure your Document objects is very important. This is called document schema design. A well-designed schema makes your documents easy to work with later, especially when you use them with language models. LangChain Document objects have two main parts: page_content and metadata.
page_content is where the main text of your document goes. This is the information that the language model will actually read and understand. It should be clean and relevant text.
metadata is a Python dictionary that stores extra information about the page_content. This could include things like the source file name, the author, the date it was created, or even a summary. Well-structured metadata helps you filter, sort, and understand your documents better. For instance, if you want to find all documents written by “Alice,” you can easily do so if “author” is in your metadata.
Let’s look at an example. Imagine you’re loading articles from different websites. Your page_content would be the article text. Your metadata could include:
source_url: The link to the original article.published_date: When the article was posted.author: Who wrote the article.category: Like “Technology” or “Sports.”
Good document schema design means thinking about what information will be useful. What might you want to search by? What helps the language model understand the context?
Here’s an example of creating a Document with useful metadata from a simple text file:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
from langchain_core.documents import Document
import os
def create_document_from_file(file_path: str) -> Document:
"""
Creates a Document object from a text file, extracting filename as metadata.
Args:
file_path: The path to the text file.
Returns:
A Document object.
"""
try:
with open(file_path, 'r', encoding='utf-8') as f:
content = f.read()
# Extract metadata
file_name = os.path.basename(file_path)
file_size = os.path.getsize(file_path) # Example of more metadata
metadata = {
"source": file_name,
"path": file_path,
"size": file_size,
"type": "text/plain"
}
return Document(page_content=content, metadata=metadata)
except FileNotFoundError:
print(f"Error: File not found at {file_path}")
return None
except Exception as e:
print(f"An error occurred while reading {file_path}: {e}")
return None
# Example usage:
if __name__ == "__main__":
# Create a dummy text file
with open("example_article.txt", "w", encoding='utf-8') as f:
f.write("This is the main content of a sample article. It talks about document loaders and their importance in AI applications.")
f.write("\n\nThis article was created on 2023-10-26 by LangChainBot.")
# Create a Document from the file
article_doc = create_document_from_file("example_article.txt")
if article_doc:
print("--- Loaded Document ---")
print(f"Page Content: {article_doc.page_content[:100]}...") # Show first 100 chars
print(f"Metadata: {article_doc.metadata}")
print("-" * 30)
# Clean up the dummy file
os.remove("example_article.txt")
Here, file_name, path, size, and type are all valuable pieces of metadata. They help you quickly understand where the page_content came from without reading the whole text. This makes your custom document loaders langchain much more useful.
Step-by-Step: Implementing a Custom Loader
Now, let’s build a more complex custom document loaders langchain from scratch. Imagine you have data stored in a simple JSON Lines file. Each line is a JSON object, representing an article. We want to load these articles and extract specific fields as metadata.
Setting Up Your Environment
First, you’ll need Python installed on your computer. Make sure you have langchain-core installed, which provides the Document and BaseLoader classes. You can install it using pip.
1
pip install langchain-core
You’ll also need a text editor or an Integrated Development Environment (IDE) like VS Code. This will help you write and run your Python code.
Inheriting from BaseLoader
The first step for implementing custom loaders is to create a new Python class that inherits from BaseLoader. This tells Python that your class is a type of LangChain document loader.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
from langchain_core.documents import Document
from langchain_core.document_loaders import BaseLoader
from typing import Iterator, List, Dict, Any
import json
import os
class JsonLinesLoader(BaseLoader):
"""
A custom document loader for JSON Lines files.
Each line in the file is expected to be a valid JSON object.
"""
def __init__(self, file_path: str, page_content_key: str, metadata_keys: List[str] = None):
"""
Initializes the JsonLinesLoader.
Args:
file_path: The path to the .jsonl file.
page_content_key: The key in the JSON object whose value will be used as page_content.
metadata_keys: An optional list of keys whose values will be extracted as metadata.
"""
self.file_path = file_path
self.page_content_key = page_content_key
self.metadata_keys = metadata_keys if metadata_keys is not None else []
In this __init__ method, you’re setting up what your loader needs to know. It needs the file_path to your JSON Lines file. It also needs to know which JSON field should become the main page_content. Lastly, it takes an optional list of metadata_keys for custom metadata extraction.
Implementing load()
Now, let’s implement the load() method for our JsonLinesLoader. This method will read the file, parse each JSON line, and create Document objects. We will focus on custom metadata extraction to get relevant information.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
# (Continuing from the previous code block)
def load(self) -> List[Document]:
"""
Load documents from the specified JSON Lines file.
Returns:
A list of Document objects.
"""
documents = []
try:
with open(self.file_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f):
try:
data = json.loads(line.strip())
# Extract page_content
page_content = data.get(self.page_content_key, "")
# Extract metadata
metadata = {"source": os.path.basename(self.file_path), "line": line_num + 1}
for key in self.metadata_keys:
if key in data:
metadata[key] = data[key]
# Create and append the Document
documents.append(Document(page_content=page_content, metadata=metadata))
except json.JSONDecodeError as e:
print(f"Error decoding JSON on line {line_num + 1} of {self.file_path}: {e}")
# Optionally, skip or log malformed lines
except Exception as e:
print(f"An unexpected error occurred processing line {line_num + 1} of {self.file_path}: {e}")
return documents
except FileNotFoundError:
print(f"Error: File not found at {self.file_path}")
return []
except Exception as e:
print(f"An error occurred while reading the file {self.file_path}: {e}")
return []
Here’s how this load() method works:
- It opens the file specified by
self.file_path. - It reads the file line by line.
- For each line, it tries to parse it as a JSON object using
json.loads(). - It extracts the value corresponding to
page_content_keyfor the document’s main content. - It creates a
metadatadictionary. It always includes thesourcefilename and thelinenumber. Then, it adds any other keys you specified inmetadata_keysfrom the JSON data. - Finally, it creates a
Documentobject and adds it to thedocumentslist. This process is repeated for every line in the file.
Implementing lazy_load() for Efficiency
For very large JSON Lines files, loading everything into memory at once with load() might not be efficient. This is where lazy_load() shines. Let’s add the lazy_load() method to our JsonLinesLoader.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
# (Continuing from the previous code block)
def lazy_load(self) -> Iterator[Document]:
"""
Lazily load documents from the specified JSON Lines file.
Yields:
Document objects one by one.
"""
try:
with open(self.file_path, 'r', encoding='utf-8') as f:
for line_num, line in enumerate(f):
try:
data = json.loads(line.strip())
page_content = data.get(self.page_content_key, "")
metadata = {"source": os.path.basename(self.file_path), "line": line_num + 1}
for key in self.metadata_keys:
if key in data:
metadata[key] = data[key]
yield Document(page_content=page_content, metadata=metadata)
except json.JSONDecodeError as e:
print(f"Error decoding JSON on line {line_num + 1} of {self.file_path}: {e}")
# You might choose to yield an error document or skip
except Exception as e:
print(f"An unexpected error occurred processing line {line_num + 1} of {self.file_path}: {e}")
except FileNotFoundError:
print(f"Error: File not found at {self.file_path}")
except Exception as e:
print(f"An error occurred while reading the file {self.file_path}: {e}")
# Example usage for the JsonLinesLoader
if __name__ == "__main__":
# Create a dummy JSON Lines file for testing
jsonl_data = [
json.dumps({"id": 1, "title": "First Article", "content": "This is the content of the first test article.", "author": "Alice"}),
json.dumps({"id": 2, "title": "Second Article", "content": "The second article provides more details.", "published_date": "2023-10-25"}),
json.dumps({"id": 3, "summary": "This is a summary, but we want 'content' as page_content.", "content": "Third article's content.", "author": "Bob"}),
"THIS IS NOT A VALID JSON LINE", # Introduce a malformed line
json.dumps({"id": 4, "title": "Fourth Article", "content": "Final article content.", "author": "Alice", "category": "Tech"})
]
with open("articles.jsonl", "w", encoding='utf-8') as f:
for line in jsonl_data:
f.write(line + "\n")
print("\n--- Testing JsonLinesLoader with load() ---")
loader_load = JsonLinesLoader(
file_path="articles.jsonl",
page_content_key="content",
metadata_keys=["id", "title", "author", "published_date", "category"]
)
docs_load = loader_load.load()
print(f"Loaded {len(docs_load)} documents using load().")
for doc in docs_load:
print(f" Content (first 50 chars): '{doc.page_content[:50]}'")
print(f" Metadata: {doc.metadata}")
print("-" * 10)
print("\n--- Testing JsonLinesLoader with lazy_load() ---")
loader_lazy = JsonLinesLoader(
file_path="articles.jsonl",
page_content_key="content",
metadata_keys=["id", "title", "author", "published_date", "category"]
)
# Iterate over the lazy loader
for i, doc in enumerate(loader_lazy.lazy_load()):
print(f"Processing lazy document {i+1}:")
print(f" Content (first 50 chars): '{doc.page_content[:50]}'")
print(f" Metadata: {doc.metadata}")
print("-" * 10)
# Clean up the dummy file
os.remove("articles.jsonl")
The lazy_load() method is very similar to load(). The key difference is the yield keyword instead of append to a list. This makes it a generator, allowing streaming document loading. You can see how we also added some basic error handling in loaders to catch bad JSON lines.
Advanced Concepts for Custom Loaders
Now that you have a solid understanding of basic custom document loaders langchain, let’s explore some more advanced topics. These will help you make your loaders more robust and versatile. You will learn about better metadata extraction, error handling, and asynchronous operations.
Custom Metadata Extraction Techniques
Extracting the right metadata is crucial for the usefulness of your documents. Simple key-value extraction, like we did with JSON, is a good start. However, real-world data can be messy. You might need more sophisticated custom metadata extraction techniques.
You could use regular expressions to find patterns in text. For example, to find dates or specific IDs within a plain text file. Libraries like BeautifulSoup are excellent for parsing HTML and extracting data from web pages. pandas can help you process structured data from CSV or Excel files.
Consider a scenario where you have plain text files, and you want to extract a “Date” and “Author” from a specific line.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
import re
from datetime import datetime
def extract_metadata_from_text(text_content: str) -> Dict[str, Any]:
"""
Extracts date and author from a text string using regular expressions.
Assumes date is in YYYY-MM-DD format and author follows "Author: ".
"""
metadata = {}
# Example 1: Extract Date
date_match = re.search(r"Date: (\d{4}-\d{2}-\d{2})", text_content)
if date_match:
try:
metadata["document_date"] = datetime.strptime(date_match.group(1), "%Y-%m-%d").date()
except ValueError:
pass # Handle invalid date format if necessary
# Example 2: Extract Author
author_match = re.search(r"Author: (.*)", text_content)
if author_match:
metadata["document_author"] = author_match.group(1).strip()
return metadata
# Example usage
if __name__ == "__main__":
sample_text = """
This is an important research paper.
It discusses new findings in AI.
Date: 2023-11-01
Version: 1.0
Author: Dr. Sophia Lee
"""
extracted = extract_metadata_from_text(sample_text)
print("Extracted Metadata:")
print(extracted)
For even more complex cases, you might want to use smaller language models or rules-based systems to extract information. This ensures your metadata is rich and consistent across your documents. The better your metadata, the easier it is for LangChain to find and use your documents effectively.
Error Handling in Loaders
Robust error handling in loaders is absolutely critical. Data sources are often unreliable. Files might be missing, network connections might fail, or data might be malformed. Your loader should handle these situations gracefully without crashing.
You should always use try-except blocks around operations that might fail. This includes file operations, network requests, and data parsing. When an error occurs, you can choose to:
- Skip the problematic item: If one document is bad, you might still want to load the others.
- Log the error: Write a message to a log file or the console. This helps you debug issues later.
- Return a partial list of documents: Let the user know some documents couldn’t be loaded.
- Raise a custom exception: If the error is critical and prevents any loading, you might stop the process entirely.
In our JsonLinesLoader, we already added basic try-except blocks. These catch FileNotFoundError and json.JSONDecodeError. Good error messages are also important to help you understand what went wrong.
Consider this example with more detailed error reporting:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
from langchain_core.documents import Document
from langchain_core.document_loaders import BaseLoader
import os
import json
from typing import Iterator, List, Dict, Any
class RobustJsonLinesLoader(BaseLoader):
"""
A JsonLinesLoader with more detailed error handling.
"""
def __init__(self, file_path: str, page_content_key: str, metadata_keys: List[str] = None):
self.file_path = file_path
self.page_content_key = page_content_key
self.metadata_keys = metadata_keys if metadata_keys is not None else []
self.errors = [] # To store details of errors encountered
def _process_line(self, line: str, line_num: int) -> Document | None:
"""Helper to process a single JSON line and create a Document."""
try:
data = json.loads(line.strip())
page_content = data.get(self.page_content_key, "")
metadata = {"source": os.path.basename(self.file_path), "line": line_num}
for key in self.metadata_keys:
if key in data:
metadata[key] = data[key]
return Document(page_content=page_content, metadata=metadata)
except json.JSONDecodeError as e:
error_msg = f"JSON decoding error on line {line_num}: {e}. Line content: '{line.strip()[:100]}...'"
self.errors.append({"line": line_num, "error_type": "JSONDecodeError", "message": error_msg})
print(f"Warning: {error_msg}")
return None
except Exception as e:
error_msg = f"Unexpected error processing line {line_num}: {e}. Line content: '{line.strip()[:100]}...'"
self.errors.append({"line": line_num, "error_type": "GenericError", "message": error_msg})
print(f"Warning: {error_msg}")
return None
def lazy_load(self) -> Iterator[Document]:
"""
Lazily load documents, storing errors found.
"""
self.errors = [] # Reset errors for a new load operation
try:
with open(self.file_path, 'r', encoding='utf-8') as f:
for line_num_1_indexed, line in enumerate(f, 1): # Start line_num from 1
doc = self._process_line(line, line_num_1_indexed)
if doc:
yield doc
except FileNotFoundError:
error_msg = f"Error: File not found at {self.file_path}"
self.errors.append({"error_type": "FileNotFoundError", "message": error_msg})
print(error_msg)
except Exception as e:
error_msg = f"An unexpected file reading error occurred for {self.file_path}: {e}"
self.errors.append({"error_type": "FileReadError", "message": error_msg})
print(error_msg)
def load(self) -> List[Document]:
"""
Load all documents, storing errors found.
"""
return list(self.lazy_load()) # Leveraging lazy_load for the load method
# Example usage with errors
if __name__ == "__main__":
# Create a dummy JSON Lines file with some errors
jsonl_data_with_errors = [
json.dumps({"id": 1, "content": "Good data line 1."}),
"NOT A JSON OBJECT", # Malformed JSON
json.dumps({"id": 2, "content": "Good data line 2.", "author": "Charlie"}),
"{id: 3, content: 'Invalid JSON syntax'}", # Another malformed JSON
json.dumps({"id": 4, "content": "Good data line 3."})
]
with open("articles_with_errors.jsonl", "w", encoding='utf-8') as f:
for line in jsonl_data_with_errors:
f.write(line + "\n")
print("\n--- Testing RobustJsonLinesLoader ---")
robust_loader = RobustJsonLinesLoader(
file_path="articles_with_errors.jsonl",
page_content_key="content",
metadata_keys=["id", "author"]
)
loaded_docs = robust_loader.load()
print(f"Successfully loaded {len(loaded_docs)} documents.")
if robust_loader.errors:
print("\n--- Errors Encountered ---")
for error in robust_loader.errors:
print(f"- {error['error_type']} at line {error.get('line', 'N/A')}: {error['message']}")
# Clean up
os.remove("articles_with_errors.jsonl")
This RobustJsonLinesLoader now tracks errors in an errors list. This gives you a clear report of what went wrong, which is invaluable for debugging and data quality checks.
Asynchronous Loader Implementation
For tasks that involve waiting, like fetching data from many websites, async loader implementation can make your loader much faster. Python’s asyncio library allows you to run multiple I/O-bound tasks concurrently. This means your loader doesn’t have to wait for one network request to finish before starting another.
To make a loader asynchronous, you typically implement an aload() or alazy_load() method. These methods use async def and await. You’ll need libraries like aiohttp for async web requests.
Here’s a conceptual example for an async loader fetching data from multiple URLs:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
import asyncio
import aiohttp
from langchain_core.documents import Document
from langchain_core.document_loaders import BaseLoader
from typing import AsyncIterator, List, Dict, Any
class AsyncURLLoader(BaseLoader):
"""A custom asynchronous document loader for URLs."""
def __init__(self, urls: List[str]):
"""
Initializes the loader with a list of URLs to fetch.
Args:
urls: A list of URLs.
"""
self.urls = urls
async def _fetch_url(self, session: aiohttp.ClientSession, url: str) -> Document | None:
"""Helper to asynchronously fetch content from a single URL."""
try:
async with session.get(url, timeout=10) as response:
response.raise_for_status() # Raise an exception for bad status codes
content = await response.text()
metadata = {"source": url, "status_code": response.status}
# You could parse content here (e.g., with BeautifulSoup) for more metadata
return Document(page_content=content, metadata=metadata)
except aiohttp.ClientError as e:
print(f"Error fetching {url}: {e}")
return None
except asyncio.TimeoutError:
print(f"Timeout fetching {url}")
return None
except Exception as e:
print(f"An unexpected error occurred for {url}: {e}")
return None
async def alazy_load(self) -> AsyncIterator[Document]:
"""
Asynchronously and lazily load documents from URLs.
Yields:
Document objects one by one as they are fetched.
"""
async with aiohttp.ClientSession() as session:
tasks = [self._fetch_url(session, url) for url in self.urls]
for task in asyncio.as_completed(tasks):
doc = await task
if doc:
yield doc
async def aload(self) -> List[Document]:
"""
Asynchronously load all documents from URLs.
Returns:
A list of Document objects.
"""
return [doc async for doc in self.alazy_load()]
# Example usage for the AsyncURLLoader
if __name__ == "__main__":
# You will need actual URLs that are publicly accessible
# Using example.com for demonstration, which might not be rich in content
test_urls = [
"https://www.example.com",
"https://httpbin.org/status/200", # Returns 200 status
"https://httpbin.org/status/404", # Returns 404 Not Found
"https://www.python.org",
"https://www.invalid-url-to-fail.com" # This URL will likely cause an error
]
async def main():
loader = AsyncURLLoader(test_urls)
print("\n--- Async Loading (aload) ---")
async_docs = await loader.aload()
print(f"Loaded {len(async_docs)} documents asynchronously.")
for doc in async_docs:
print(f" Source: {doc.metadata.get('source', 'N/A')}, Status: {doc.metadata.get('status_code', 'N/A')}")
# print(f" Content (first 100 chars): '{doc.page_content[:100]}...'")
print("-" * 10)
print("\n--- Async Lazy Loading (alazy_load) ---")
async for i, doc in enumerate(loader.alazy_load()):
print(f"Processing async lazy document {i+1}:")
print(f" Source: {doc.metadata.get('source', 'N/A')}, Status: {doc.metadata.get('status_code', 'N/A')}")
print("-" * 10)
# To run an async function
# You might need to install aiohttp: pip install aiohttp
asyncio.run(main())
This example shows async loader implementation where _fetch_url gets data for one URL. alazy_load then uses asyncio.as_completed to process multiple URLs as they finish. This significantly speeds up data loading from many external sources.
Streaming Document Loading
Streaming document loading is naturally achieved with the lazy_load() (or alazy_load() for async) methods. As we discussed, these methods yield documents one by one, rather than returning them all at once. This is fundamental for processing large or continuous data streams.
Imagine you are monitoring a live feed of social media posts. You don’t want to wait until an hour’s worth of posts accumulates before processing them. With streaming document loading via lazy_load(), you can process each post as it arrives. This is also essential when working with files too large to fit into memory, like massive log files.
The yield keyword makes this possible. When a lazy_load() method yields a document, your program can immediately start working with that document. The loader then pauses until the next document is requested. This continuous flow of documents is what “streaming” means in this context.
Testing Your Custom Loaders
After building your custom document loaders langchain, testing custom loaders is a crucial step. You need to make sure they work correctly, handle different data types, and gracefully manage errors. Writing tests helps you catch bugs early.
You should write “unit tests” for your loader. A unit test checks a small, isolated piece of your code. For a loader, this means testing load() and lazy_load() methods.
Here are some things to test:
- Correct document creation: Does it create
Documentobjects with the rightpage_contentandmetadata? - Empty input: What happens if the input file is empty or the data source has no items?
- Malformed data: Does it handle incorrect JSON, bad file formats, or missing fields without crashing? (This is where your
error handling in loadersis tested.) - Large datasets: Does
lazy_load()correctly handle many items without excessive memory use? - Edge cases: Test data with unusual characters, very long strings, or missing expected metadata keys.
You can use Python’s built-in unittest module or a popular testing framework like pytest. Here’s an example using pytest for our RobustJsonLinesLoader.
First, install pytest:
1
pip install pytest
Then, create a test file, e.g., test_json_loader.py:
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
import pytest
import os
import json
from langchain_core.documents import Document
from your_module import RobustJsonLinesLoader # Assuming RobustJsonLinesLoader is in 'your_module.py'
# Helper function to create a dummy JSONL file for tests
@pytest.fixture
def create_dummy_jsonl(tmp_path):
def _creator(filename="test_data.jsonl", data=[]):
file_path = tmp_path / filename
with open(file_path, "w", encoding="utf-8") as f:
for item in data:
f.write(json.dumps(item) + "\n")
return str(file_path)
return _creator
def test_load_valid_data(create_dummy_jsonl):
"""Test loading valid JSONL data."""
test_data = [
{"id": 1, "title": "Article One", "content": "Content of article one."},
{"id": 2, "title": "Article Two", "content": "Content of article two."}
]
file_path = create_dummy_jsonl(data=test_data)
loader = RobustJsonLinesLoader(file_path, page_content_key="content", metadata_keys=["id", "title"])
docs = loader.load()
assert len(docs) == 2
assert isinstance(docs[0], Document)
assert docs[0].page_content == "Content of article one."
assert docs[0].metadata["id"] == 1
assert docs[0].metadata["title"] == "Article One"
assert "source" in docs[0].metadata
assert "line" in docs[0].metadata
def test_lazy_load_valid_data(create_dummy_jsonl):
"""Test lazy loading valid JSONL data."""
test_data = [
{"id": 1, "title": "Lazy Article One", "content": "Lazy content of article one."},
{"id": 2, "title": "Lazy Article Two", "content": "Lazy content of article two."}
]
file_path = create_dummy_jsonl(data=test_data)
loader = RobustJsonLinesLoader(file_path, page_content_key="content", metadata_keys=["id", "title"])
docs_iter = loader.lazy_load()
docs = list(docs_iter) # Convert generator to list for assertions
assert len(docs) == 2
assert docs[0].page_content == "Lazy content of article one."
assert docs[1].metadata["id"] == 2
def test_load_with_missing_page_content_key(create_dummy_jsonl):
"""Test handling of missing page_content key."""
test_data = [
{"id": 1, "title": "Only Title"},
{"id": 2, "title": "Another Title", "content": "Valid content"}
]
file_path = create_dummy_jsonl(data=test_data)
loader = RobustJsonLinesLoader(file_path, page_content_key="content")
docs = loader.load()
assert len(docs) == 2
assert docs[0].page_content == "" # Expect empty string if key is missing
assert docs[1].page_content == "Valid content"
assert not loader.errors # No errors expected for missing key, just empty string
def test_load_with_malformed_json(create_dummy_jsonl, capsys):
"""Test handling of malformed JSON lines."""
test_data = [
{"id": 1, "content": "Good line."},
"THIS IS NOT JSON",
{"id": 2, "content": "Another good line."}
]
file_path = create_dummy_jsonl(data=test_data)
loader = RobustJsonLinesLoader(file_path, page_content_key="content")
docs = loader.load()
assert len(docs) == 2 # Only good lines should be loaded
assert len(loader.errors) == 1
assert "JSON decoding error" in loader.errors[0]["message"]
# Check if a warning was printed (capsys fixture captures stdout/stderr)
captured = capsys.readouterr()
assert "Warning: JSON decoding error" in captured.out
def test_load_non_existent_file():
"""Test loading from a file that does not exist."""
loader = RobustJsonLinesLoader("non_existent_file.jsonl", page_content_key="content")
docs = loader.load()
assert len(docs) == 0
assert len(loader.errors) == 1
assert "FileNotFoundError" in loader.errors[0]["error_type"]
assert "File not found" in loader.errors[0]["message"]
To run these tests, navigate to the directory containing test_json_loader.py and run pytest. Remember to replace your_module with the actual name of the Python file where RobustJsonLinesLoader is defined. These tests ensure your loader works as expected in various scenarios.
Packaging and Sharing Your Custom Loaders
Once you’ve built and thoroughly tested your custom document loaders langchain, you might want to share them. Or, you might want to use them in different projects. This requires packaging custom loaders. Packaging means putting your code into a format that others can easily install and use.
The standard way to package Python code is to create a Python package. This involves a specific directory structure and a setup.py or pyproject.toml file. These files tell Python how to build and install your package.
Here’s a basic project structure for packaging custom loaders:
1
2
3
4
5
6
7
8
9
10
my_langchain_loaders/
├── my_langchain_loaders/
│ ├── __init__.py
│ └── json_lines.py # Your RobustJsonLinesLoader class would be here
│ └── custom_html.py # Another custom loader you might create
├── tests/
│ └── test_json_lines.py # Your pytest tests
├── README.md
├── setup.py # Or pyproject.toml
└── requirements.txt
Inside setup.py, you’d define your package name, version, and dependencies.
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
# setup.py
from setuptools import setup, find_packages
setup(
name="my-langchain-loaders",
version="0.1.0",
author="Your Name",
author_email="your.email@example.com",
description="Custom document loaders for LangChain",
long_description=open("README.md").read(),
long_description_content_type="text/markdown",
url="https://github.com/yourusername/my-langchain-loaders", # Replace with your repo
packages=find_packages(), # This automatically finds your 'my_langchain_loaders' directory
install_requires=[
"langchain-core>=0.1.0", # Minimum LangChain core version
"aiohttp>=3.0.0", # If you use async loaders
# Add any other dependencies your loaders need (e.g., BeautifulSoup)
],
classifiers=[
"Programming Language :: Python :: 3",
"License :: OSI Approved :: MIT License",
"Operating System :: OS Independent",
],
python_requires=">=3.9",
)
To create a distributable package, you would run:
1
python setup.py sdist bdist_wheel
This creates .tar.gz and .whl files in a dist/ directory. You can then share these files, or even upload them to PyPI (the Python Package Index). Once packaged, others can install your loaders using pip install my-langchain-loaders. This makes it simple to integrate your custom document loaders langchain into any LangChain project.
Conclusion
You’ve now taken a deep dive into building custom document loaders langchain. You started with the basics of the BaseLoader class overview and the LangChain Document structure. Then, you learned about the critical lazy_load vs load methods and how to apply them.
You explored document schema design for rich metadata and practiced implementing custom loaders with practical code examples. We covered advanced topics like custom metadata extraction, robust error handling in loaders, and efficient async loader implementation. You also understand the power of streaming document loading.
Finally, you learned about testing custom loaders to ensure their reliability and the steps for packaging custom loaders for sharing. With these skills, you are ready to connect LangChain to virtually any data source, no matter how unique.
Building your own loaders gives you immense control and flexibility. It lets you tailor LangChain applications to your specific data needs. Keep experimenting with different data formats and extraction techniques. Your ability to integrate diverse data sources will make your LangChain applications truly stand out.
If you’re interested in more basic LangChain concepts, you might find our Introduction to LangChain blog post helpful. Or, for deeper dives into specific data sources, check out Loading Data from Databases with LangChain.
Leave a comment