Serving embeddings at blazing speed

Sorry in advance for the catchy title, serving embedding is usually bottlenecked by computing it first ie running the model on the input. That is not what I’m going to dive into below. This is more of a hand’s on experimentations article where I try different methods on client and server to send and receive numpy arrays.

In this article, I’ll try to compare different methods of serving numpy arrays. Why would you ever need that you’d ask? These kinds of endpoints are generally used when serving embedding endpoints.

Shameless plug here, an example of these kinds of routes can be found on my recent project cria an open-source OpenAI compatible API for serving LLama models written entirely in Rust. This API has a /embeddings route that generates a vector representation of a given input string or vector of strings.

Nice chat! now let’s implement a simple API using FastAPI, that takes in an EmbeddingRequest and returns a batch of embeddings:

from typing import List
import numpy as np
from fastapi import FastAPI, Response
from fastapi.responses import ORJSONResponse
from pydantic import BaseModel

DIM = 1024
app = FastAPI()

class EmbeddingRequest(BaseModel):
    model_name: str
    input: str | List[str]

@app.post("/json_embed/")
async def json_embed(req: EmbeddingRequest):
    return {
        "model_name": req.model_name,
        "embedding": np.random.randn(len(req.input), DIM)
                      .astype(np.float32)
                      .tolist(),
    }

Notice two things:

  1. We are calling tolist() on a Numpy array to effectively convert it into a Python list
  2. We are serializing the result into a JSON object before sending it to the client

This just feels sad… Before diving into the different experiments I did, let’s first write the client. I’ll use aiohttp library to have a fast async client for our tests.

# Import stuff here ....

n_batch = 32
n_reqs = 100

async def send_req(client):
    async with client.post(
        endpoint, json={"model_name": model_name, "input": [input] * n_batch}
    ) as resp:
        assert resp.status == 200
        return await resp.json()


async def main():
    async with aiohttp.ClientSession() as client:
        json_responses = await asyncio.gather(
            *[send_req(client) for _ in range(n_reqs)]
        )
    assert len(json_responses) == n_reqs

I guess I’ll have to explain a little bit why async. Async is a concurrency model that is far more efficient than multi-threading for IO-bound tasks. In this example, I use the same session to spawn 100 concurrent tasks and await the results. Because querying an API is IO-bound by design, you can interleave sending and receiving responses to improve the performance. In contrast, a synchronous client would send a request, wait for the response, then parse it and do it all over again. The goal of asynchronous programming is easy: don’t waste time waiting!

Great, let’s go back to our server and address the first issue: JSON serialization. One obvious solution is to serialize the numpy array directly and avoid all the crazy heap allocation of python objects created when moving from contiguous numpy array to Python lists. Let’s do it :

In [1]: import numpy as np
In [2]: import json
In [3]: json.dumps(np.random.randn(100))

TypeError: Object of type ndarray is not JSON serializable

Not so fast! The error is pretty explicit, the default JSON encoder (the default one in FastAPI) can’t serialize numpy arrays!

OK, how about we change the serializer? One great choice is the orjson JSON library. It supports numpy array with minimal memory footprint and is REALLY fast. Thanks to FastAPI modularity we can swap out the `json_encoder` class quite easily :

class EmbeddingORJSONResponse(Response):
    media_type = "application/json"
    def render(self, content: Any) -> bytes:
        return orjson.dumps(content, option=orjson.OPT_SERIALIZE_NUMPY)

@app.post("/orjson_embed/", response_class=EmbeddingORJSONResponse)
async def embed(req: EmbeddingRequest):
    return EmbeddingORJSONResponse(
        {
            "model_name": req.model_name,
            "embedding": np.random.randn(len(req.input), DIM).astype(np.float32),
        }
    )

A simple change on the client side also :

import orjson

async def send_req(client):
    async with client.post(
        endpoint, json={"model_name": model_name, "input": [input] * n_batch}
    ) as resp:
        assert resp.status == 200
        return orjson.loads(await resp.read())

Now let’s benchmark our change. I’m running the server and client on the same machine: M1 with 16go RAM. I’m using uvicorn as a webserver with 7 workers for serving. Each client is requesting a batch of 32 f32 vectors of dimension 1024.

I’m also using hyperfine for running the benchmarks :

❯ hyperfine -i ./async_client_json.py ./async_client_orjson.py
Benchmark 1: ./async_client_json.py
  Time (mean ± σ):      3.548 s ±  0.135 s    [User: 0.535 s, System: 0.052 s]
  Range (min … max):    3.452 s …  3.883 s    10 runs

Benchmark 2: ./async_client_orjson.py
  Time (mean ± σ):     317.4 ms ±  16.9 ms    [User: 167.7 ms, System: 40.3 ms]
  Range (min … max):   305.1 ms … 362.2 ms    10 runs

Summary
  './async_client_orjson.py' ran
   11.18 ± 0.73 times faster than './async_client_json.py'

That’s a 11.18x speedup!

Not bad! But can we do more?

One thing I kept looking at is why are we serializing numpy array and then deserializing them client side. In a scenario where both the client and server trust each other and have the same endianness, we can send directly the vector as bytes! Then we can load them from the client side. NO SERIALIZATION is needed. Here is the implementation, I called the route raw_embed:

from fastapi import Response

@app.post("/raw_embed/")
async def raw_embed(req: EmbeddingRequest):
    data = np.random.randn(len(req.input), DIM).astype(np.float32)
    return Response(content=data.tobytes())

You can see here that I am using the Response object from FastAPI directly and setting the content to the raw bytes representing the embedding vector. From the client side, I am reading the content and loading it into the embedding vector:

async def send_req(client):
    async with client.post(
        endpoint, json={"model_name": model_name, "input": [input] \* n_batch}
    ) as resp:
        assert resp.status == 200
        data = await resp.read()
        emb= np.frombuffer(data, dtype=np.float32).reshape(n_batch, dim)

How much did we gain? Let’s first increase the batch_size to 256 and number of requests to 100 :

Summary
  './async_client_raw.py' ran
    2.88 ± 0.45 times faster than './async_client_orjson.py'

2.88x compared to using orjson ! A win is a win…

Sending raw unencoded numpy arrays is therefore 33 times faster than using JSON. But keep in mind that I am testing the client and the server on the same machine. That’s great and all but how slow is this transfer of bytes? If I crank up the number of requests to let’s say 1000 on the async_client_raw I top out at ~200 MiB/s . Now running iperf3 client and server on M1 we can see a maximal bitrate of 110Gbit/s which is ~10 Gb/s. Of course, this is TCP and not HTTP but still, the difference is staggering.

This is where Rust comes in …

Once you learn Rust, you can’t but think: how about I write it in Rust?

I picked up Rust last year and wrote some personal and professional projects and it has become my favorite language by far. The combination of expressiveness and performance, plus the modern tooling and wide tooling makes it pretty great!

I have used web application framework in Rust in the past but never an async HTTP client. The popular choice here is reqwest which provides both an async and blocking high-level HTTP clients. So let’s try it !

Here is the full client code:

#![feature(array_chunks)]

use std::mem;
use std::time::Duration;

use anyhow::Result;
use bytes::Bytes;
use futures::{stream, StreamExt};
use reqwest::{self, Client};
use serde::Serialize;

use clap::Parser;

const F32_SIZE: usize = mem::size_of::<f32>();

#[derive(Parser, Debug)]
#[command(author, version, about, long_about = None)]
struct Args {
    #[arg(long, default_value_t = 100)]
    n_reqs: usize,

    #[arg(long, default_value_t = 256)]
    batch_size: usize,

    #[arg(long, default_value_t = 10)]
    n_concurrent: usize,

    #[arg(short, long, default_value = "http://127.0.0.1:8000/raw_embed")]
    endpoint: String,
}
#[derive(Serialize, Debug)]
struct Request {
    model_name: &'static str,
    input: Vec<String>,
}
impl Request {
    fn new(batch_size: usize) -> Self {
        Self {
            model_name: "test_model",
            input: vec!["this is a test".to_string(); batch_size],
        }
    }
}

fn parse_arr(data: &Bytes) -> Vec<f32> {
    // unsafe { slice::from_raw_parts(data.as_ptr() as *const f32, data.len() / 4) }
    // array_chunks is a nighlty
    // This is the correct  way to &[u8] -> Vec<f32>
    data.array_chunks::<F32_SIZE>()
        .map(|c| f32::from_le_bytes(*c))
        .collect()
}
async fn send_request(
    request_id: usize,
    client: &Client,
    endpoint: &str,
    batch_size: usize,
) -> Result<()> {
    let resp = client
        .post(endpoint)
        .json(&Request::new(batch_size))
        .timeout(Duration::from_secs(1))
        .send()
        .await
        .map_err(|e| {
            tracing::error!("{:?}", e);
            e
        })?;
    let data = resp.bytes().await?;
    tracing::info!("[REQ {}] Received data : {}", request_id, data.len());
    let arr = parse_arr(&data);
    tracing::info!("Arr last val {:?}", arr.last());
    Ok(())
}

#[tokio::main]
async fn main() -> Result<()> {
    let args = Args::parse();

    tracing_subscriber::fmt::init();

    let client = reqwest::Client::builder()
        .pool_idle_timeout(Duration::from_millis(1))
        .pool_max_idle_per_host(10)
        .build()?;

    stream::iter(0..args.n_reqs)
        .map(|i| {
            let client = &client;
            send_request(i, client, &args.endpoint, args.batch_size)
        })
        .buffer_unordered(args.n_concurrent)
        .map(|f| f.unwrap())
        .collect::<Vec<_>>()
        .await;

    Ok(())
}

Notice here that I used a Stream to asynchronously produce the request. The requests are executed buffer_unordered(N)` to buffer up to N requests at a time. I find this pattern on stream beautiful. You have a built-in rate limiter and you could chain up the execution of multiple streams asynchronously. The buffering is unordered to reduce latency. I also like how readable this code is !

I also rewrote the server in using axum to have the exact same route /raw_embed that sends raw bytes in the response content:

async fn raw_embed(
    // this argument tells axum to parse the request body
    // as JSON into a \`CreateUser\` type
    Json(payload): Json<EmbeddingRequest>,
) -> Bytes {
    let mut rng = rand::thread_rng();
    let dim = payload.input.len() \* 1024;
    let data: Vec<f32> = (0..dim).map(|_| rng.gen::<f32>()).collect::<Vec<_>>();
    let raw_data: &[u8] =
        unsafe { slice::from_raw_parts(data.as_ptr() as \*const u8, data.len() \* 4) };

    tracing::info!("Sent raw_vec with last element : {:?}", data.last());
    Bytes::from(raw_data)
}

Now let’s first run the benchmark using the async_client_raw and our fast-client both use the Axum server (it’s faster than FastAPI trust me):

Summary
  './fast-client/bins/fast-client --n-concurrent 10 --n-reqs 1000' ran
    1.83 ± 0.15 times faster than './async_client_raw.py'

If you didn’t believe me on the Axum vs FastAPI claim , here is the benchmark of the fast-client with both FastAPI and Axum endpoints :

Summary
  './fast-client/bins/fast-client --n-concurrent 10 --n-reqs 1000' ran
    9.02 ± 0.67 times faster than './fast-client/bins/fast-client --n-concurrent 10 --n-reqs 1000 --endpoint http://localhost:8001/raw_embed'

And that’s Rust for you ! That’s a 9x improvement between Axum and FastAPI !

Also using the rust client yields 1.83x improvement from asyncio client !

One last trick I tried is to use zero-copy parsing of the bytes in the clients. I basically cast the &[u8] slice into &[f32] . Of course, this operation is unsafe but the two slices have the same alignment, and computing the length is trivial, here is the piece with the changes to the parse_arr function:

fn parse_arr(data: &Bytes) -> &[f32] {
    unsafe { slice::from_raw_parts(data.as_ptr() as \*const f32, data.len() / 4) }
Summary
 './fast-client/bins/fast-client-zcp --n-concurrent 10 --n-reqs 1000' ran
 1.15 ± 0.09 times faster than './fast-client/bins/fast-client --n-concurrent 10 --n-reqs 1000'

Finally, that’s 1.15x speed up compared to the previous version.

Now running the fast-clientand fast-server I can see a maximum network throughput of 1.9 Gbit/s!


That’s it for me! I frequently find myself creating toy experiments to compare different solutions while coding. I truly believe that these small experiments enhance your judgment and refine your perspective on what is important and what isn’t. Nothing can replace hands-on experience, and I hope that sharing this brief journey in vector serving has been informative! If you come across anything unusual in the code or if you want more details, feel free to reach out to me.