Skip to content

Commit

Permalink
wip: client test with rust based client
Browse files Browse the repository at this point in the history
  • Loading branch information
vlastahajek committed Dec 27, 2024
1 parent 2b5a7cd commit a51b569
Show file tree
Hide file tree
Showing 4 changed files with 375 additions and 11 deletions.
1 change: 1 addition & 0 deletions packages/client/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -88,6 +88,7 @@
},
"dependencies": {
"@grpc/grpc-js": "^1.9.9",
"@lakehouse-rs/flight-sql-client": "^0.0.10",
"@protobuf-ts/grpc-transport": "^2.9.1",
"@protobuf-ts/grpcweb-transport": "^2.9.1",
"@protobuf-ts/runtime-rpc": "^2.9.1",
Expand Down
14 changes: 10 additions & 4 deletions packages/client/src/impl/QueryApiImpl.ts
Original file line number Diff line number Diff line change
Expand Up @@ -88,9 +88,11 @@ export default class QueryApiImpl implements QueryApi {

const meta = this.prepareMetadata(options.headers)
const rpcOptions: RpcOptions = {meta}

const getStart = Date.now();
const flightDataStream = client.doGet(ticket, rpcOptions)

const getEnd = Date.now();
console.log(`get time: ${getEnd - getStart}ms`);
const start = Date.now();
const binaryStream = (async function* () {
for await (const flightData of flightDataStream.responses) {
// Include the length of dataHeader for the reader.
Expand All @@ -102,7 +104,8 @@ export default class QueryApiImpl implements QueryApi {
})()

const reader = await RecordBatchReader.from(binaryStream)

const end = Date.now();
console.log(`record batch reader time: ${end - start}ms`);
yield* reader
}

Expand All @@ -112,7 +115,7 @@ export default class QueryApiImpl implements QueryApi {
options: QueryOptions
): AsyncGenerator<Record<string, any>, void, void> {
const batches = this._queryRawBatches(query, database, options)

const start = Date.now();
for await (const batch of batches) {
const row: Record<string, any> = {}
for (const batchRow of batch) {
Expand All @@ -122,8 +125,11 @@ export default class QueryApiImpl implements QueryApi {
yield row
}
}
const end = Date.now();
console.log(`query response time: ${end - start}ms`);
}


async *queryPoints(
query: string,
database: string,
Expand Down
137 changes: 137 additions & 0 deletions packages/client/test/integration/client.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
import {InfluxDBClient} from '../../src'
import { ClientOptions, createFlightSqlClient } from '@lakehouse-rs/flight-sql-client';
import {RecordBatchReader, tableFromIPC} from 'apache-arrow';

/*
const DB_HOST = process.env.HOST;
const DB_TOKEN = process.env.TOKEN;
const DB_DB = process.env.BUCKET_NAME;
*/
const DB_HOST = "https://us-east-1-1.aws.cloud2.influxdata.com";
const DB_TOKEN = "jGVT56tt_eeD7WDfh0y945R6r54k3XYmdLWLjJU0vpWsJbS7PqbwLFX58vfgucEOx5jr_B9v6Ap-myu4oVnirg==";
const DB_DB = "samans-bucket";

//query = `SELECT usage_user FROM "cpu_host_max" WHERE time > now() - interval '60 minute'`; //This takes ~0.26 seconds
//const queryShort = `SELECT * FROM win_cpu WHERE time > now() - interval '3 hours'`; //This takes 2+ seconds
const queryLong = `SELECT * FROM win_cpu WHERE time > now() - interval '2 days'`; //This takes 8+ seconds

describe('benchmark', () => {
it('query data', async () => {
// Create an instance of InfluxDB client
const client = new InfluxDBClient({
host: DB_HOST,
token: DB_TOKEN,
database: DB_DB
});

const run = async (query: string, limit: number) => {
console.log(`running - ${query}`);

let totalTime = 0;
const arr = [];
for (let i = 0; i < 5; i++) {
const start = Date.now();
try {
const result = await client.query(query);
//const result = await client.queryPoints(query);
console.log(`reading start`)
for await (const val of result) {
arr.push(val);
}
} catch (error) {
console.error('Error executing query:', error);
}

//fs.writeFileSync('output.txt', JSON.stringify(arr));
const end = Date.now();
const elapsed = (end - start) / 1000;
console.log(`iter: ${i} \t ${elapsed.toFixed(6)}s`);
totalTime += elapsed;
}
console.log(`avg time: ${(totalTime / 5).toFixed(6)}s`);
}
var limit : number = 0;



for (let n = 1; n <= 2; n += 500) {
await run(queryLong, limit);
await new Promise((resolve) => setTimeout(resolve, 10000));
}
}).timeout(300_000)
it('query data Flight Recordbatchreader', async () => {
const options: ClientOptions = {
tls: true,
//host: DB_HOST,
host: "us-east-1-1.aws.cloud2.influxdata.com",
port: 443,
token: DB_TOKEN,
headers: [{key: "database", value: DB_DB}],
};
console.log("creating client");
const client = await createFlightSqlClient(options);
console.log("executing query");
const start = Date.now();
const buffer = await client.query(queryLong);
const after = Date.now();
console.log("converting to table");
const batches = await RecordBatchReader.from(buffer);
var rows = [];
for await (const batch of batches) {
const row: Record<string, any> = {}
for (const batchRow of batch) {
for (const column of batch.schema.fields) {
row[column.name] = batchRow[column.name]
}
rows.push(row);
}
}
//const table = tableFromIPC(buffer);
const end = Date.now();
console.log(`query time: ${after - start}ms`);
console.log(`table time: ${end - after}ms`);
//console.log(table);
console.log(`rows: ${rows.length}`);
}).timeout(300_000)
it('query data Flight tablefromIpc', async () => {
const options: ClientOptions = {
tls: true,
//host: DB_HOST,
host: "us-east-1-1.aws.cloud2.influxdata.com",
port: 443,
token: DB_TOKEN,
headers: [{key: "database", value: DB_DB}],
};
console.log("creating client");
const client = await createFlightSqlClient(options);
console.log("executing query");
const start = Date.now();
const buffer = await client.query(queryLong);
const after = Date.now();
console.log("converting to table");
const table = tableFromIPC(buffer);

var rows = [];
for await (const batch of table.batches) {
const row: Record<string, any> = {}
for (const batchRow of batch) {
for (const column of batch.schema.fields) {
row[column.name] = batchRow[column.name]
}
rows.push(row);
}
}

const end = Date.now();
console.log(`query time: ${after - start}ms`);
console.log(`table time: ${end - after}ms`);
//console.log(table);
console.log(`table schema: ${table.schema}`);
console.log(`rows: ${rows.length}`);
}).timeout(300_000)

})

describe('flight-sql-client', () => {

})
Loading

0 comments on commit a51b569

Please sign in to comment.