Skip to content

Commit

Permalink
add myx adapter
Browse files Browse the repository at this point in the history
  • Loading branch information
marvin6en committed May 16, 2024
1 parent f2b4bf8 commit 5543814
Show file tree
Hide file tree
Showing 2 changed files with 81 additions and 62 deletions.
114 changes: 71 additions & 43 deletions adapters/myx/src/index.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,27 @@
import { getPositionsForAddressByPoolAtBlock as getSyncSwapPositionsForAddressByPoolAtBlock} from "./sdk/positionSnapshots"
import {
getPositionsForAddressByPoolAtBlock as getSyncSwapPositionsForAddressByPoolAtBlock
} from "./sdk/positionSnapshots"

import { promisify } from 'util';
import {promisify} from 'util';
import stream from 'stream';
import csv from 'csv-parser';
import fs from 'fs';
import { write } from 'fast-csv';
import {write} from 'fast-csv';


interface CSVRow {
block_number: number
timestamp: string
user_address: string
token_address: string
token_symbol: string
token_balance: string
usd_price: string
export interface OutputDataSchemaRow {
block_number: number; //block_number which was given as input
timestamp: number; // block timestamp which was given an input, epoch format
user_address: string; // wallet address, all lowercase
token_address: string; // token address all lowercase
token_balance: bigint; // token balance, raw amount. Please dont divide by decimals
token_symbol: string; //token symbol should be empty string if it is not available
usd_price: number; //assign 0 if not available
}

interface BlockData {
blockNumber: number;
blockTimestamp: number;
}


Expand All @@ -23,44 +30,65 @@ const pipeline = promisify(stream.pipeline);
// Assuming you have the following functions and constants already defined
// getPositionsForAddressByPoolAtBlock, CHAINS, PROTOCOLS, AMM_TYPES, getPositionDetailsFromPosition, getLPValueByUserAndPoolFromPositions, BigNumber

const readBlocksFromCSV = async (filePath: string): Promise<number[]> => {
const blocks: number[] = [];
await pipeline(
fs.createReadStream(filePath),
csv(),
async function* (source) {
for await (const chunk of source) {
// Assuming each row in the CSV has a column 'block' with the block number
if (chunk.block) blocks.push(parseInt(chunk.block, 10));
}
}
);
const readBlocksFromCSV = async (filePath: string): Promise<BlockData[]> => {
const blocks: BlockData[] = [];

await new Promise<void>((resolve, reject) => {
fs.createReadStream(filePath)
.pipe(csv()) // Specify the separator as '\t' for TSV files
.on('data', (row) => {
const blockNumber = parseInt(row.number, 10);
const blockTimestamp = parseInt(row.timestamp, 10);
if (!isNaN(blockNumber) && blockTimestamp) {
blocks.push({ blockNumber: blockNumber, blockTimestamp });
}
})
.on('end', () => {
resolve();
})
.on('error', (err) => {
reject(err);
});
});

return blocks;
};

export const getUserTVLByBlock = async (blocks: BlockData) => {
const {blockNumber, blockTimestamp} = blocks
// Retrieve data using block number and timestamp

const getData = async () => {
const snapshotBlocks = [
4605383
// Add more blocks as needed
]; //await readBlocksFromCSV('src/sdk/mode_chain_daily_blocks.csv');
const csvRows = await getSyncSwapPositionsForAddressByPoolAtBlock(blockNumber);
// console.log(csvRows);
return csvRows;
};

const csvRows: CSVRow[] = [];
readBlocksFromCSV('hourly_blocks.csv').then(async (blocks: any[]) => {
console.log(blocks);
const allCsvRows: any[] = [];

for (let block of snapshotBlocks) {
// SyncSwap Linea position snapshot
const rows = await getSyncSwapPositionsForAddressByPoolAtBlock(block)
rows.forEach((row) => csvRows.push(row as CSVRow))
}
// test
// const result = await getUserTVLByBlock({blockNumber:4605383, blockTimestamp: 1715864188});
// allCsvRows.push(...result);

// Write the CSV output to a file
const ws = fs.createWriteStream('outputData.csv');
write(csvRows, { headers: true }).pipe(ws).on('finish', () => {
console.log("CSV file has been written.");
for (const block of blocks) {
try {
const result = await getUserTVLByBlock(block);
allCsvRows.push(...result);
} catch (error) {
console.error(`An error occurred for block ${block}:`, error);
}
}
await new Promise((resolve, reject) => {
const ws = fs.createWriteStream(`outputData.csv`, {flags: 'w'});
write(allCsvRows, {headers: true})
.pipe(ws)
.on("finish", () => {
console.log(`CSV file has been written.`);
resolve;
});
});
};

getData().then(() => {
console.log("Done");
});

}).catch((err) => {
console.error('Error reading CSV file:', err);
});
29 changes: 10 additions & 19 deletions adapters/myx/src/sdk/positionSnapshots.ts
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
import {CHAINS, SUBGRAPH_URLS} from "./config";
import Decimal from "decimal.js";
import {OutputDataSchemaRow} from "../index";


interface LiquidityPositionSnapshot {
Expand Down Expand Up @@ -33,20 +34,10 @@ interface SubgraphResponse {
}
}

interface UserPositionSnapshotsAtBlockData {
block_number: number
timestamp: string
user_address: string
token_address: string
token_symbol: string
token_balance: string
usd_price: string
}

export const getPositionsForAddressByPoolAtBlock = async (
snapshotBlockNumber: number
): Promise<UserPositionSnapshotsAtBlockData[]> => {
const userPositionSnapshotsAtBlockData: UserPositionSnapshotsAtBlockData[] = []
): Promise<OutputDataSchemaRow[]> => {
const userPositionSnapshotsAtBlockData: OutputDataSchemaRow[] = []
let snapshotsArrays: LiquidityPositionSnapshot[] = []
const snapshotsMap = new Map<string, Map<string, LiquidityPositionSnapshot>>() // user => pool => snapshot
let skip = 0
Expand Down Expand Up @@ -115,28 +106,28 @@ export const getPositionsForAddressByPoolAtBlock = async (
userPositionSnapshotMap.forEach((positionSnapshot) => {
userPositionSnapshotsAtBlockData.push({
user_address: positionSnapshot.recipient,
timestamp: new Date(positionSnapshot.timestamp * 1000).toISOString(),
timestamp: Number(new Date(positionSnapshot.timestamp * 1000).toISOString()),
token_address: positionSnapshot.token0.address,
block_number: snapshotBlockNumber,
token_symbol: positionSnapshot.token0.symbol,
token_balance: new Decimal(positionSnapshot.token0Amount).toFixed(0),
usd_price: "0"
token_balance: BigInt(new Decimal(positionSnapshot.token0Amount).toFixed(0)),
usd_price: 0
})

const exists = userPositionSnapshotsAtBlockData.find((value) => {
return value.user_address == positionSnapshot.recipient && value.token_address == positionSnapshot.token1.address;
})
if (exists) {
exists.token_balance = new Decimal(positionSnapshot.token1Amount).add(exists.token_balance).toFixed(0);
exists.token_balance = BigInt(new Decimal(positionSnapshot.token1Amount).add(exists.token_balance.toString()).toFixed(0));
} else {
userPositionSnapshotsAtBlockData.push({
user_address: positionSnapshot.recipient,
timestamp: new Date(positionSnapshot.timestamp * 1000).toISOString(),
timestamp: Number(new Date(positionSnapshot.timestamp * 1000).toISOString()),
token_address: positionSnapshot.token1.address,
block_number: snapshotBlockNumber,
token_symbol: positionSnapshot.token1.symbol,
token_balance: new Decimal(positionSnapshot.token1Amount).toFixed(0),
usd_price: "0"
token_balance: BigInt(new Decimal(positionSnapshot.token1Amount).toFixed(0)),
usd_price: 0
})
}
})
Expand Down

0 comments on commit 5543814

Please sign in to comment.