Skip to content

Simple service to connect milvus to nats-io messaging queue and integrate with AI

License

Notifications You must be signed in to change notification settings

alexandriaproject-io/milvus-adapter-service

Repository files navigation

milvus-adapter-service

The Milvus adapter service simplifies interactions like searching or adding items by vectorizing texts independently, eliminating the need for using vectors directly. It also integrates NATS.io messaging with Jetstream and employs Thrift message wrapping for communication with other services.

Milvus Adapter Service Customization

The service offers customization options including nats subscription modifiers, selection of indexing methods like IVF_FLAT or HNSW, choice of vectorizing models, and custom collection name. These features enable the service to operate across multiple deployments, handling diverse data sets such as text or images. Milvus Adapter Service architecture:

Table of contents

Milvus Adapter Service Architecture

Alt text

The Milvus Adapter service orchestrates operations involving data handling and text processing. It interfaces with a NATS Client to process search, addition, and deletion requests via respective controllers. The service employs Sentence Transformer Workers for advanced text encoding and decoding for accurate and efficient linguistic data management. Tasks are managed through an execution queue and the service is connected to the Milvus DB, ensuring streamlined operations and communication.

Rest API Server

When enabled will listen to Rest API requests ( without thrift encoding ):

Status API

The service exposes status endpoints to allow for easier integration and health monitoring:

Additionally, the service will expose swagger and Thrift object documentation at:

ENV parameters:

Status server configuration

Variable Name Default Value values Description
LOG_LEVEL info critical, fatal, error, warning, warn, info, debug Level of logs: critical, fatal, error, warning, warn, info, debug
SERVER_HOST 127.0.0.1 0.0.0.0 - 255.255.255.255 IP address the status server will listen to (0.0.0.0 is any ip).
SERVER_PORT 5000 1-65535 Port the status server will listen to.

Rest API server configuration

Variable Name Default Value values Description
API_SERVER_ENABLED false Bool Weather to enable REST Api server or not
API_SERVER_HOST 127.0.0.1 0.0.0.0 - 255.255.255.255 IP address the REST Api server will listen to (0.0.0.0 is any ip).
API_SERVER_PORT 5000 1-65535 Port the REST Api server will listen to.

Nats connection configuration

Variable Name Default Value values Description
NATS_ENABLED false Bool Weather to enable Nats client or not
NATS_URL - Url String Nats connection url
NATS_USER - String Nats auth user name
NATS_PASS - String Nats auth password
NATS_TLS False Bool Weather to use TLS when connecting to nats
NATS_SUFFIX 'default' string Nats unique subscription suffix, example: milvus.add.{suffix}
NATS_QUEUE_GROUP - string Nats queue group name, usefully when running multiple copies of the same service
NATS_GRACE_TIME 10 Number Time to give the service to finish executing messages when exiting in seconds

Milvus connection configuration

Variable Name Default Value values Description
MILVUS_HOSTNAME - hostname String Milvus connection url
MILVUS_PORT 19530 1-65535 Milvus connection port
MILVUS_USERNAME - String Milvus connection user name
MILVUS_PASSWORD - String Milvus connection password
MILVUS_USE_TLS False Bool Weather to use TLS when connecting to milvus
MILVUS_WORKERS 2 Number Number of threads to run vectorization and milvus interactions
MILVUS_NUM_PARTITIONS 64 1-4096 Number of desired partitions in the collection

Vectorizing configurations

Variable Name Default Value values Description
VECTOR_SEGMENT_COLLECTION - String Name of the collection
VECTOR_DIM 768 Number Number of dimentions in the vector collections
INDEX_USE_IVF_FLAT False Bool Index collection using IVF_FLAT algorithm
INDEX_USE_HNSW False Bool Index collection using HNSW algorithm

Vectorizer model config

Variable Name Default Value values Description
VECTOR_MODEL_PATH - String Path of the Sentence Transformer model
VECTOR_MODEL_CACHE_FOLDER - String Sentence Transformer cache folder
VECTOR_MODEL_DEVICE cpu cpu, cuda, ipu, xpu, mkldnn, opengl, opencl, ideep, hip, ve, fpga, ort, xla, lazy, vulkan, mps, meta, hpu, mtia Sentence Transformer model device
HUGGING_FACE_AUTH_TOKEN - String Hugging face token to download restricted models

Nats tester config

Variable Name Default Value values Description
NATS_TESTER_USER - String Nats auth user name
NATS_TESTER_PASS - String Nats auth password
NATS_TESTER_TLS False Bool Weather to use TLS when connecting to nats

Run with Docker

NOTE: The service is built to have external Milvus and Nats.io services

  • With caching ( to avoid re-downloading Sentence transformer models )
docker run --env-file .env -v "[YOUR LOCAL CACHE FOLDER]:/cache" --name "milvus-adapter" niftylius/milvus-adapter
  • Without caching enabled
docker run --env-file .env --name "milvus-adapter" niftylius/milvus-adapter

Run locally

  • Create Python Virtual Environment:
    • python -m venv venv
  • Activate the virtual environment:
    • source venv/bin/activate
  • Install required packages:
    • pip3 install -r requirements.txt
  • Create .env file based on .env.example
    • Change the Model path and config then Run the server:
      • python3 main.py --multiprocess

Run in Docker

You can also run the service in a docker, but make sure you copy the .env.example file and add your milvus db configurations
NOTE: The example contains NATS.io - it uses Trhift for messages

# run milvus adapter docker (to run detached add -d after the `run` command )
docker run --name test_container -p 5050:5050 -p 4040:4040 --env-file .env.example niftylius/milvus-adapter

# Status swagger: http://127.0.0.1:5050/swagger
# Rest API swagger: http://127.0.0.1:4040/swagger

Run with docker-compose

Docker compose will start milvus database server as well as milvus adapter
to run download docker-compose.yml from
https://raw.githubusercontent.com/alexandriaproject-io/milvus-adapter-service/main/docker-compose.yml

# download docker-compose.yml with curl
curl https://raw.githubusercontent.com/alexandriaproject-io/milvus-adapter-service/main/docker-compose.yml -o docker-compose.yml

# download docker-compose.yml with wget
wget https://raw.githubusercontent.com/alexandriaproject-io/milvus-adapter-service/main/docker-compose.yml -O docker-compose.yml

# run docker compose (to run detached add -d after the `run` command)
docker compose up

# Status swagger: http://127.0.0.1:5050/swagger
# Rest API swagger: http://127.0.0.1:4040/swagger

NOTE: milvus may take a while to start...
NOTE: Please shutdown milvus properly otherwise it can corrupt the data

Rest API Endpoints

POST /api/search

Will return a vector search based on the payload parameters

Payload:

{
  "search": "string",
  // *Required: Search text to find similar items for
  "document_ids": [
    "string"
  ],
  // *Optional: List of document IDs (plays partition role)
  "offset": 0,
  // *Optional: how many responses to skip
  "limit": 0,
  // *Optional: how many responses to return
  "sf": 0
  // *Optional: Search factor ( rf or nprobe depending on the index )
}

Response:

{
  "result_items": 0,
  // Results count
  "results": [
    {
      "distance": 0,
      // Distance value
      "document_id": "string",
      // document_id of the result
      "section_id": "string",
      // section_id of the result
      "segment_id": "string"
      // section_id of the result
    }
  ]
}

POST /api/upsert

Will add or update a new vector row/item

Payload:

{
  "segment_text": "string",
  // Text to vectorize and save
  "document_id": "string",
  // Id of the document ( plays partition role )
  "section_id": "string",
  // Id of the section
  "segment_id": "string"
  // Id of the segment
}

Response:

{
  "insert_count": 1,
  // Inserted segments
  "upsert_count": 1,
  // Updated segments
  "delete_count": 1
  // Deleted segments
}

NOTE: For some reason they all return as 1 due to an issue with milvus library

POST /api/delete

The endpoint will delete the requested vector
Warning: Deleting vectors can cause lag spikes and temporary performance slowdowns

Payload:

{
  "document_id": "string",
  // Id of the document ( plays partition role )
  "section_id": "string",
  // Id of the section
  "segment_id": "string"
  // Id of the segment
}

Response:

{
  "delete_count": 1
  // Deleted segments
}

Subscriptions

Thrift search request

struct MilvusSegmentGetRequest {
  1: string search;                         // Search text to find similar items for
  2: optional list<string> document_ids,    // List of document IDs (plays partition role)
  3: optional i16 offset;                   // how many responses to skip
  4: optional i16 limit;                    // how many responses to return
  5: optional i16 sf;                       // Search factor ( rf or nprobe depending on the index )
}

Thrift search response

struct L2SegmentSearchResult {
  1: double distance,               // Distance value
  2: string document_id,            // document_id of the result
  3: string section_id,             // section_id of the result
  4: string segment_id              // section_id of the result
}

struct L2SegmentSearchResponse {
  1: list<L2SegmentSearchResult> results,   // List of results
  2: i32 total                              // Results count
  3: bool is_error                          // Is error or not
  4: optional string error_text             // Error message text
}

Thrift add message / request

struct MilvusSegmentUpsertPayload {
  1: string segment_text;           // Text to vectorize and save
  2: string document_id;            // Id of the document ( plays partition role )
  3: string section_id;             // Id of the section
  4: string segment_id;             // Id of the segment
}

Thrift add response

struct L2SegmentUpsertResponse {
    1: i32 insert_count             // Inserted segments
    2: i32 update_count            // Updated segments
    3: i32 delete_count             // Deleted segments
    4: bool is_error                // Is error or not
    5: optional string error_text   // Error message text
}

Thrift delete message / request

struct MilvusSegmentDeletePayload {
  2: string document_id;            // Id of the document ( plays partition role )
  3: string section_id;             // Id of the section
  4: string segment_id;             // Id of the segment
}

Thrift delete response

struct L2SegmentDeleteResponse {
    1: i32 delete_count             // Deleted segments
    3: bool is_error                // Is error or not
    4: optional string error_text   // Error message text
}

About

Simple service to connect milvus to nats-io messaging queue and integrate with AI

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published