Skip to main content

Milvus

Overview

This page guides you through the process of setting up the Milvus destination connector.

There are three parts to this:

  • Processing - split up individual records in chunks so they will fit the context window and decide which fields to use as context and which are supplementary metadata.
  • Embedding - convert the text into a vector representation using a pre-trained model (Currently, OpenAI's text-embedding-ada-002 and Cohere's embed-english-light-v2.0 are supported.)
  • Indexing - store the vectors in a vector database for similarity search

Prerequisites

To use the Milvus destination, you'll need:

  • An account with API access for OpenAI or Cohere (depending on which embedding method you want to use)
  • Either a running self-managed Milvus instance or a Zilliz account

You'll need the following information to configure the destination:

  • Embedding service API Key - The API key for your OpenAI or Cohere account
  • Milvus Endpoint URL - The URL of your Milvus instance
  • Either Milvus API token or Milvus Instance Username and Password
  • Milvus Collection name - The name of the collection to load data into

Features

FeatureSupported?Notes
Full Refresh SyncYes
Incremental - Append SyncYes
Incremental - Append + DedupedYes
PartitionsNo
Record-defined IDNoAuto-id needs to be enabled

Configuration

Processing

Each record will be split into text fields and meta fields as configured in the "Processing" section. All text fields are concatenated into a single string and then split into chunks of configured length. If specified, the metadata fields are stored as-is along with the embedded text chunks. Options around configuring the chunking process use the Langchain Python library.

When specifying text fields, you can access nested fields in the record by using dot notation, e.g. user.name will access the name field in the user object. It's also possible to use wildcards to access all fields in an object, e.g. users.*.name will access all names fields in all entries of the users array.

The chunk length is measured in tokens produced by the tiktoken library. The maximum is 8191 tokens, which is the maximum length supported by the text-embedding-ada-002 model.

The stream name gets added as a metadata field _ab_stream to each document. If available, the primary key of the record is used to identify the document to avoid duplications when updated versions of records are indexed. It is added as the _ab_record_id metadata field.

Embedding

The connector can use one of the following embedding methods:

  1. OpenAI - using OpenAI API , the connector will produce embeddings using the text-embedding-ada-002 model with 1536 dimensions. This integration will be constrained by the speed of the OpenAI embedding API.

  2. Cohere - using the Cohere API, the connector will produce embeddings using the embed-english-light-v2.0 model with 1024 dimensions.

For testing purposes, it's also possible to use the Fake embeddings integration. It will generate random embeddings and is suitable to test a data pipeline without incurring embedding costs.

Indexing

If the specified collection doesn't exist, the connector will create it for you with a primary key field pk and the configured vector field matching the embedding configuration. Dynamic fields will be enabled. The vector field will have an L2 IVF_FLAT index with an nlist parameter of 1024.

If you want to change any of these settings, create a new collection in your Milvus instance yourself. Make sure that

  • The primary key field is set to auto_id
  • There is a vector field with the correct dimensionality (1536 for OpenAI, 1024 for Cohere) and a configured index

If the record contains a field with the same name as the primary key, it will be prefixed with an underscore so Milvus can control the primary key internally.

Setting up a collection

When using the Zilliz cloud, this can be done using the UI - in this case only the collection name and the vector dimensionality needs to be configured, the vector field with index will be automatically created under the name vector. Using the REST API, the following command will create the index:

POST /v1/vector/collections/create
{
"collectionName": "my-collection",
"dimension": 1536,
"metricType": "L2",
"vectorField": "vector",
“primaryField”: “pk”
}

When using a self-hosted Milvus cluster, the collection needs to be created using the Milvus CLI or Python client. The following commands will create a collection set up for loading data via Airbyte:

from pymilvus import CollectionSchema, FieldSchema, DataType, connections, Collection

connections.connect() # connect to locally running Milvus instance without authentication

pk = FieldSchema(name="pk",dtype=DataType.INT64, is_primary=True, auto_id=True)
vector = FieldSchema(name="vector",dtype=DataType.FLOAT_VECTOR,dim=1536)
schema = CollectionSchema(fields=[pk, vector], enable_dynamic_field=True)
collection = Collection(name="test_collection", schema=schema)
collection.create_index(field_name="vector", index_params={ "metric_type":"L2", "index_type":"IVF_FLAT", "params":{"nlist":1024} })

Langchain integration

To initialize a langchain vector store based on the indexed data, use the following code:

embeddings = OpenAIEmbeddings(openai_api_key="my-key")
vector_store = Milvus(embeddings=embeddings, collection_name="my-collection", connection_args={"uri": "my-zilliz-endpoint", "token": "my-api-key"})
vector_store.fields.append("text")
# call vs.fields.append() for all fields you need from the metadata

vector_store.similarity_search("test")

Changelog

Expand to review
VersionDatePull RequestSubject
0.0.422024-12-2850508Update dependencies
0.0.412024-12-2150169Update dependencies
0.0.402024-12-1449307Update dependencies
0.0.392024-11-2548655Update dependencies
0.0.382024-11-0447789Update dependencies
0.0.372024-10-2847505Update dependencies
0.0.362024-10-2347080Update dependencies
0.0.352024-10-0546483Update dependencies
0.0.342024-09-2846165Update dependencies
0.0.332024-09-2145802Update dependencies
0.0.322024-09-1445573Update dependencies
0.0.312024-09-0745281Update dependencies
0.0.302024-08-3144960Update dependencies
0.0.292024-08-2444754Update dependencies
0.0.282024-08-2244530Update test dependencies
0.0.272024-08-1744242Update dependencies
0.0.262024-08-1243776Update dependencies
0.0.252024-08-1043619Update dependencies
0.0.242024-08-0343111Update dependencies
0.0.232024-07-2742823Update dependencies
0.0.222024-07-2042213Update dependencies
0.0.212024-07-1341793Update dependencies
0.0.202024-07-1041495Update dependencies
0.0.192024-07-0640632Update dependencies
0.0.182024-06-2640537Update dependencies
0.0.172024-06-2540446Update dependencies
0.0.162024-06-2240161Update dependencies
0.0.152024-05-2038276Replace AirbyteLogger with logging.Logger
0.0.142024-3-22#37333Update CDK & pytest version to fix security vulnerabilities
0.0.132024-3-22#35911Move to poetry; Fix tests
0.0.122023-12-11#33303Fix bug with embedding special tokens
0.0.112023-12-01#32697Allow omitting raw text
0.0.102023-11-16#32608Support deleting records for CDC sources
0.0.92023-11-13#32357Improve spec schema
0.0.82023-11-08#31563Auto-create collection if it doesn't exist
0.0.72023-10-23#31563Add field mapping option
0.0.62023-10-19#31599Base image migration: remove Dockerfile and use the python-connector-base image
0.0.52023-10-15#31329Add OpenAI-compatible embedder option
0.0.42023-10-04#31075Fix OpenAI embedder batch size
0.0.32023-09-29#30820Update CDK
0.0.22023-08-25#30689Update CDK to support azure OpenAI embeddings and text splitting options, make sure primary key field is not accidentally set, promote to certified
0.0.12023-08-12#29442Milvus connector with some embedders