The webcrawler-source agent crawls a website and outputs the site's URL and an HTML document. Crawling a website is an ideal first step in a text embeddings pipeline.
This agent keeps the status of the crawling in a persistent storage. Storage won’t contain a copy of the crawl data, but a single JSON file with a name computed from the name of the agent and the id of the LangStream application.
By default, it requires an S3-compatible bucket that must be defined using bucketName
, endpoint
, access-key
, secret-key
and region
properties.
Another solution is to store the status in a persistent disk provided by LangStream. This can be achieved by setting state-storage: disk
.
Example webcrawler agent in a pipeline:
pipeline:
- name: "Crawl the WebSite"
type: "webcrawler-source"
configuration:
seed-urls: ["https://docs.langstream.ai/"]
allowed-domains: ["https://docs.langstream.ai"]
forbidden-paths: []
min-time-between-requests: 500
reindex-interval-seconds: 3600
max-error-count: 5
max-urls: 1000
max-depth: 50
handle-robots-file: true
user-agent: "" # this is computed automatically, but you can override it
scan-html-documents: true
http-timeout: 10000
handle-cookies: true
max-unflushed-pages: 100
state-storage: disk
Multiple seed-urls and allowed-domains are allowed.
To add them to your pipeline, use this syntax:
seed-urls:
- "http://example1.com"
- "http://example2.com"
allowed-domains:
- "http://example1.com"
- "http://example2.com"
Input
- None, it’s a source
Output
Checkout the full configuration properties in the API Reference page.
Label | Type | Description |
---|---|---|
pendingUrls | String | Holds the URLs that have been discovered but are yet to be processed. |
remainingUrls | String | Holds the URLs that have been discovered but are yet to be processed. |
visitedUrls | String | Holds all URLs that have been visited to prevent cyclic crawling. |
Using the webcrawler source agent as a starting point, this workflow will crawl a website, get a page's raw HTML, and process that information into chunks for embedding in a vector database.
The complete example code is available in the LangStream repository.
- Topics necessary for the application are declared: we will later pass the chunked embeddings to a vector database via the Kafka "chunks-topic".
name: "Crawl a website"
topics:
- name: "chunks-topic"
creation-mode: create-if-not-exists
- The webcrawler source agent configuration is declared. For help finding your credential information, see Secrets. For help configuring the webcrawler-source, see Configuration.
pipeline:
seed-urls: ["https://docs.langstream.ai/"]
allowed-domains: ["https://docs.langstream.ai"]
forbidden-paths: []
min-time-between-requests: 500
reindex-interval-seconds: 3600
max-error-count: 5
max-urls: 1000
max-depth: 50
handle-robots-file: true
user-agent: "" # this is computed automatically, but you can override it
scan-html-documents: true
http-timeout: 10000
handle-cookies: true
max-unflushed-pages: 100
state-storage: disk
The webcrawler itself uses the Jsoup library to parse HTML with the WHATWG HTML spec. The webcrawler explores the web starting from a list of seed URLs and follows links within pages to discover more content.
For each seed-url, the webcrawler:
- Sets up a connection using Jsoup.
- Catches HTTP status errors and handles retries or skips based on the error code (
HttpStatusException
for HTTP errors andUnsupportedMimeTypeException
for non-HTML content types) - If the content is HTML, the webcrawler fetches the links (hrefs) from the page and adds them to the list of URLs to be crawled, if the href passes the allowed-domain rule set in the pipeline configuration.
- Gets the page's HTML and creates a document with the site url as a header and raw HTML as output.
The webcrawler then passes the document on to the next agent.
- The text-extractor agent extracts metadata and text from records using Apache Tika.
- name: "Extract text"
type: "text-extractor"
- The text-normaliser agent forces the text into lower case and removes leading and trailing spaces.
- name: "Normalise text"
type: "text-normaliser"
configuration:
make-lowercase: true
trim-spaces: true
- The language detector agent identifies a record’s language. In this case, non-English records are skipped, and English records continue to the next step in the pipeline.
- name: "Detect language"
type: "language-detector"
configuration:
allowedLanguages: ["en"]
property: "language"
- The text-splitter agent splits the document into chunks of text. This is an important part of the vectorization pipeline to understand, because it requires balancing between performance and accuracy. chunk_size controls the maximum number of characters of the chunked documents, and chunk_overlap controls the amount of overlap between chunks. A little overlap keeps results more consistent. chunk_size defaults to 1000 characters, and chunk_overlap defaults to 200 characters.
- name: "Split into chunks"
type: "text-splitter"
configuration:
splitter_type: "RecursiveCharacterTextSplitter"
chunk_size: 400
separators: ["\n\n", "\n", " ", ""]
keep_separator: false
chunk_overlap: 100
length_function: "cl100k_base"
- The document-to-json agent converts the unstructured data to structured JSON.
- name: "Convert to structured data"
type: "document-to-json"
configuration:
text-field: text
copy-properties: true
- The compute agent structures the JSON output into values the final compute step can work with.
- name: "prepare-structure"
type: "compute"
configuration:
fields:
- name: "value.filename"
expression: "properties.name"
type: STRING
- name: "value.chunk_id"
expression: "properties.chunk_id"
type: STRING
- name: "value.language"
expression: "properties.language"
type: STRING
- name: "value.chunk_num_tokens"
expression: "properties.chunk_num_tokens"
type: STRING
- Now that the text is processed and structured, the compute-ai-embeddings agent computes embeddings and sends them to the Kafka "chunks-topic".
- name: "compute-embeddings"
id: "step1"
type: "compute-ai-embeddings"
output: "chunks-topic"
configuration:
model: "text-embedding-ada-002" # This needs to match the name of the model deployment, not the base model
embeddings-field: "value.embeddings_vector"
text: "{{ value.text }}"
batch-size: 10
flush-interval: 500
10. Where to next? If you've got an Astra vector database, use the vector-db-sink agent to sink the vectorized embeddings via the Kafka "chunks-topic" to your database. From there, you can query your vector data, or ask questions with a chatbot. It's up to you!
- name: "Write to Astra"
type: "vector-db-sink"
input: "chunks-topic"
resources:
size: 2
configuration:
datasource: "AstraDatasource"
table-name: "documents"
keyspace: "documents"
mapping: "filename=value.filename, chunk_id=value.chunk_id, language=value.language, text=value.text, embeddings_vector=value.embeddings_vector, num_tokens=value.chunk_num_tokens"