Skip to content

Commit

Permalink
Merge branch 'delta-hq:main' into main
Browse files Browse the repository at this point in the history
  • Loading branch information
LayneHaber authored Apr 22, 2024
2 parents cad51aa + 7899953 commit 49e6549
Show file tree
Hide file tree
Showing 13 changed files with 2,897 additions and 18 deletions.
98 changes: 80 additions & 18 deletions adapters/izumiswap/src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import { promisify } from 'util';
import stream from 'stream';
import csv from 'csv-parser';
import fs from 'fs';
import { format } from 'fast-csv';
import { write } from 'fast-csv';

//Uncomment the following lines to test the getPositionAtBlock function
Expand Down Expand Up @@ -45,20 +44,20 @@ interface CSVRow {

const pipeline = promisify(stream.pipeline);

const readBlocksFromCSV = async (filePath: string): Promise<number[]> => {
const blocks: number[] = [];
await pipeline(
fs.createReadStream(filePath),
csv(),
async function* (source) {
for await (const chunk of source) {
// Assuming each row in the CSV has a column 'block' with the block number
if (chunk.block) blocks.push(parseInt(chunk.block, 10));
}
}
);
return blocks;
};
// const readBlocksFromCSV = async (filePath: string): Promise<number[]> => {
// const blocks: number[] = [];
// await pipeline(
// fs.createReadStream(filePath),
// csv(),
// async function* (source) {
// for await (const chunk of source) {
// // Assuming each row in the CSV has a column 'block' with the block number
// if (chunk.block) blocks.push(parseInt(chunk.block, 10));
// }
// }
// );
// return blocks;
// };


const getData = async () => {
Expand Down Expand Up @@ -132,6 +131,69 @@ export const getUserTVLByBlock = async (blocks: BlockData) => {
return csvRows
};

getData().then(() => {
console.log("Done");
});
// getData().then(() => {
// console.log("Done");
// });

const readBlocksFromCSV = async (filePath: string): Promise<BlockData[]> => {
const blocks: BlockData[] = [];

await new Promise<void>((resolve, reject) => {
fs.createReadStream(filePath)
.pipe(csv()) // Specify the separator as '\t' for TSV files
.on('data', (row) => {
const blockNumber = parseInt(row.number, 10);
const blockTimestamp = parseInt(row.timestamp, 10);
if (!isNaN(blockNumber) && blockTimestamp) {
blocks.push({ blockNumber: blockNumber, blockTimestamp });
}
})
.on('end', () => {
resolve();
})
.on('error', (err) => {
reject(err);
});
});

return blocks;
};


readBlocksFromCSV('src/hourly_blocks.csv').then(async (blocks) => {
console.log(blocks);
const allCsvRows: any[] = []; // Array to accumulate CSV rows for all blocks
const batchSize = 1000; // Size of batch to trigger writing to the file
let i = 0;

allCsvRows.push({ block_number: 'block_number', timestamp: 'timestamp', user_address: 'user_address', token_address: 'token_address', token_balance: 'token_balance', token_symbol: 'token_symbol', usd_price: 'usd_price' });
for (const block of blocks) {
try {
const result = await getUserTVLByBlock(block);

// Accumulate CSV rows for all blocks
allCsvRows.push(...result);

i++;
// console.log(`Processed block ${i}`);

// Write to file when batch size is reached or at the end of loop
if (i % batchSize === 0 || i === blocks.length) {
const ws = fs.createWriteStream(`outputData.csv`, { flags: i === batchSize ? 'w' : 'a' });
write(allCsvRows, { headers: i === batchSize ? true : false })
.pipe(ws)
.on("finish", () => {
console.log(`CSV file has been written.`);
});

// Clear the accumulated CSV rows
allCsvRows.length = 0;
}
} catch (error) {
console.error(`An error occurred for block ${block}:`, error);
}

}
}).catch((err) => {
console.error('Error reading CSV file:', err);
});
24 changes: 24 additions & 0 deletions adapters/lyve/package.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
{
"name": "lyve",
"version": "1.0.0",
"description": "",
"main": "index.js",
"scripts": {
"test": "echo \"Error: no test specified\" && exit 1",
"start": "node dist/index.js",
"compile": "tsc",
"watch": "tsc -w",
"clear": "rm -rf dist"
},
"keywords": [],
"author": "",
"license": "UNLICENSED",
"dependencies": {
"fast-csv": "^5.0.1",
"node-fetch": "^3.3.2"
},
"devDependencies": {
"@types/node": "^20.11.30",
"typescript": "^5.4.3"
}
}
136 changes: 136 additions & 0 deletions adapters/lyve/src/index.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import * as fs from "fs";
import { write } from "fast-csv";


type OutputDataSchemaRow = {
block_number: number;
timestamp: number;
user_address: string;
token_address: string;
token_balance: number;
token_symbol: string;
usd_price: number;
};

const LINEA_RPC = "https://rpc.linea.build";
const LYU_ADDRESS = "0xb20116eE399f15647BB1eEf9A74f6ef3b58bc951";

const LYVE_SUBGRAPH_QUERY_URL = "https://api.studio.thegraph.com/query/53783/lyve-lp-tvl/version/latest";

const LYVE_STABILITY_POOL_QUERY = `
query StabilityPoolQuery {
userDeposits(first: 1000,orderBy: _newDeposit, orderDirection: desc) {
_depositor,
_newDeposit
}
}
`;

const _VESSELS_QUERY = `
query VesselQuery {
vessels(first: 1000,where: { _status: 0 }) {
id
_borrower
_asset
updates {
_coll
blockTimestamp
}
}
}
`;

const post = async (url: string, data: any): Promise<any> => {
const response = await fetch(url, {
method: "POST",
headers: {
"Content-Type": "application/json",
Accept: "application/json",
},
body: JSON.stringify(data),
});
return await response.json();
};

const getStabilityPoolData = async (blockNumber: number, blockTimestamp: number): Promise<OutputDataSchemaRow[]> => {
const csvRows: OutputDataSchemaRow[] = [];
const responseJson = await post(LYVE_SUBGRAPH_QUERY_URL, { query: LYVE_STABILITY_POOL_QUERY });
for (const item of responseJson.data.userDeposits) {
csvRows.push({
block_number: blockNumber,
timestamp: blockTimestamp,
user_address: item._depositor,
token_address: LYU_ADDRESS,
token_balance: item._newDeposit,
token_symbol: "LYU",
usd_price: 0
});
}
return csvRows;
};
const getVesselDepositsData = async (blockNumber: number, blockTimestamp: number): Promise<OutputDataSchemaRow[]> => {
const csvRows: OutputDataSchemaRow[] = [];
const responseJson = await post(LYVE_SUBGRAPH_QUERY_URL, { query: _VESSELS_QUERY });
for (const item of responseJson.data.vessels) {
const sortedUpdates = item.updates.sort((a: any, b: any) => b.blockTimestamp - a.blockTimestamp);
const updatedAssetAmount = sortedUpdates[0]._coll;
csvRows.push({
block_number: blockNumber,
timestamp: blockTimestamp,
user_address: item._borrower,
token_address: item._asset,
token_balance: updatedAssetAmount,
token_symbol: "",
usd_price: 0
});
}
return csvRows;
};
interface BlockData {
blockNumber: number;
blockTimestamp: number;
}
export const main = async (blocks: BlockData[]) => {
const allCsvRows: any[] = []; // Array to accumulate CSV rows for all blocks
const batchSize = 10; // Size of batch to trigger writing to the file
let i = 0;

for (const { blockNumber, blockTimestamp } of blocks) {
try {
// Retrieve data using block number and timestamp
const csvRowsStabilityPool = await getStabilityPoolData(blockNumber, blockTimestamp);
const csvRowsVessels = await getVesselDepositsData(blockNumber, blockTimestamp);
const csvRows = csvRowsStabilityPool.concat(csvRowsVessels);

// Accumulate CSV rows for all blocks
allCsvRows.push(...csvRows);

i++;
console.log(`Processed block ${i}`);

// Write to file when batch size is reached or at the end of loop
if (i % batchSize === 0 || i === blocks.length) {
const ws = fs.createWriteStream(`outputData.csv`, { flags: i === batchSize ? 'w' : 'a' });
write(allCsvRows, { headers: i === batchSize ? true : false })
.pipe(ws)
.on("finish", () => {
console.log(`CSV file has been written.`);
});

// Clear the accumulated CSV rows
allCsvRows.length = 0;
}
} catch (error) {
console.error(`An error occurred for block ${blockNumber}:`, error);
}
}
};

export const getUserTVLByBlock = async (blocks: BlockData) => {
const { blockNumber, blockTimestamp } = blocks
// Retrieve data using block number and timestamp
const csvRowsStabilityPool = await getStabilityPoolData(blockNumber, blockTimestamp);
const csvRowsVessels = await getVesselDepositsData(blockNumber, blockTimestamp);
const csvRows = csvRowsStabilityPool.concat(csvRowsVessels);
return csvRows
};
Loading

0 comments on commit 49e6549

Please sign in to comment.