Skip to content

Commit

Permalink
Merge pull request bitfinexcom#324 from bitfinexcom/staging
Browse files Browse the repository at this point in the history
Release version to master
  • Loading branch information
prdn authored Oct 4, 2023
2 parents 063024a + 58d6116 commit 5411e01
Show file tree
Hide file tree
Showing 25 changed files with 655 additions and 411 deletions.
2 changes: 1 addition & 1 deletion package.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
{
"name": "bfx-reports-framework",
"version": "4.8.1",
"version": "4.9.0",
"description": "Bitfinex reports framework",
"main": "worker.js",
"license": "Apache-2.0",
Expand Down
3 changes: 2 additions & 1 deletion test/helpers/helpers.mock-rest-v2.js
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,8 @@ const getMockDataOpts = () => ({
generate_token: null,
delete_token: null,
login: null,
login_verify: null
login_verify: null,
platform_status: null
})

const getExtraMockMethods = () => (new Map([
Expand Down
4 changes: 4 additions & 0 deletions test/helpers/mock-data.js
Original file line number Diff line number Diff line change
Expand Up @@ -445,5 +445,9 @@ module.exports = new Map([
'look at this note'
]
]
],
[
'platform_status',
[1]
]
])
20 changes: 20 additions & 0 deletions test/test-cases/api-sync-mode-sqlite-test-cases.js
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,26 @@ module.exports = (
} = params
const auth = { token: '' }

it('it should be successfully performed by the getPlatformStatus method', async function () {
this.timeout(5000)

const res = await agent
.post(`${basePath}/json-rpc`)
.type('json')
.send({
method: 'getPlatformStatus',
id: 5
})
.expect('Content-Type', /json/)
.expect(200)

assert.isObject(res.body)
assert.propertyVal(res.body, 'id', 5)
assert.isObject(res.body.result)
assert.isBoolean(res.body.result.isMaintenance)
assert.isNotOk(res.body.result.isMaintenance)
})

it('it should be successfully performed by the pingApi method', async function () {
this.timeout(5000)

Expand Down
14 changes: 14 additions & 0 deletions test/test-cases/get-sync-progress-test-case.js
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,21 @@ module.exports = (
assert.propertyVal(res.body, 'id', 5)
assert.isObject(res.body.result)
assert.isNumber(res.body.result.progress)
assert.isBoolean(res.body.result.isSyncInProgress)

assert.isOk(
res.body.result.error === null ||
typeof res.body.result.error === 'string'
)
assert.isOk(
res.body.result.state === null ||
[
'SYNCHRONIZATION_HAS_NOT_BEEN_STARTED_YET',
'SYNCHRONIZATION_IS_STARTED',
'SYNCHRONIZATION_IS_FINISHED',
'SYNCHRONIZATION_HAS_BEEN_INTERRUPTED'
].some((state) => state === res.body.result.state)
)
assert.isOk(
res.body.result.syncStartedAt === null ||
Number.isInteger(res.body.result.syncStartedAt)
Expand Down
2 changes: 2 additions & 0 deletions workers/api.framework.report.wrk.js
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,9 @@ class WrkReportFrameWorkApi extends WrkReportServiceApi {
await super.stopService()

const wsTransport = this.container.get(TYPES.WSTransport)
const processMessageManager = this.container.get(TYPES.ProcessMessageManagerFactory)()

processMessageManager.stop()
wsTransport.stop()
}
}
Expand Down
3 changes: 2 additions & 1 deletion workers/loc.api/bfx.api.router/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ class BfxApiRouter extends BaseBfxApiRouter {
['fundingCreditHistory', 90],
['accountSummary', 90],
['logins', 90],
['changeLogs', 90]
['changeLogs', 90],
['status', 30]
])
}

Expand Down
3 changes: 2 additions & 1 deletion workers/loc.api/di/app.deps.js
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,8 @@ module.exports = ({
['_dataConsistencyChecker', TYPES.DataConsistencyChecker],
['_winLossVSAccountBalance', TYPES.WinLossVSAccountBalance],
['_getDataFromApi', TYPES.GetDataFromApi],
['_httpRequest', TYPES.HTTPRequest]
['_httpRequest', TYPES.HTTPRequest],
['_wsEventEmitterFactory', TYPES.WSEventEmitterFactory]
]
})
rebind(TYPES.RServiceDepsSchemaAliase)
Expand Down
Original file line number Diff line number Diff line change
@@ -1,13 +1,11 @@
'use strict'

const { pipeline } = require('stream')
const { stringify } = require('csv')

const {
write
} = require('bfx-report/workers/loc.api/queue/write-data-to-stream/helpers')

const nope = () => {}
const {
streamWriter
} = require('bfx-report/workers/loc.api/generate-csv/csv-writer/helpers')

module.exports = (
rService,
Expand Down Expand Up @@ -35,79 +33,19 @@ module.exports = (
queue.emit('progress', 0)

if (typeof jobData === 'string') {
const stringifier = stringify(
{ columns: ['mess'] }
await streamWriter(
wStream,
[{
columnParams: { columns: ['mess'] },
writeFn: (stream) => write([{ mess: jobData }], stream)
}]
)

pipeline(stringifier, wStream, nope)
write([{ mess: jobData }], stringifier)
queue.emit('progress', 100)
stringifier.end()

return
}

wStream.setMaxListeners(50)

const timestampsStringifier = stringify({
header: true,
columns: columnsCsv.timestamps
})
const posNameStringifier = stringify(
{ columns: ['name'] }
)
const posStringifier = stringify({
header: true,
columns: columnsCsv.positionsSnapshot
})
const positionsTotalPlUsdStringifier = stringify({
header: true,
columns: columnsCsv.positionsTotalPlUsd
})
const walletsNameStringifier = stringify(
{ columns: ['name'] }
)
const walletsStringifier = stringify({
header: true,
columns: columnsCsv.walletsSnapshot
})
const walletsTotalBalanceUsdStringifier = stringify({
header: true,
columns: columnsCsv.walletsTotalBalanceUsd
})
const positionsTickersNameStringifier = stringify(
{ columns: ['name'] }
)
const positionsTickersStringifier = stringify({
header: true,
columns: columnsCsv.positionsTickers
})
const walletsTickersNameStringifier = stringify(
{ columns: ['name'] }
)
const walletsTickersStringifier = stringify({
header: true,
columns: columnsCsv.walletsTickers
})

const rStreamArr = [
timestampsStringifier,
posNameStringifier,
posStringifier,
positionsTotalPlUsdStringifier,
walletsNameStringifier,
walletsStringifier,
walletsTotalBalanceUsdStringifier,
positionsTickersNameStringifier,
positionsTickersStringifier,
walletsTickersNameStringifier,
walletsTickersStringifier
]

rStreamArr.forEach((rStream) => {
pipeline(rStream, wStream, nope)
})

const res = await getDataFromApi({
getData: rService[name].bind(rService),
args,
Expand All @@ -121,65 +59,115 @@ module.exports = (
walletsTickers,
positionsTotalPlUsd,
walletsTotalBalanceUsd
} = { ...res }
} = res ?? {}

write(
[{ mtsCreated, end }, {}],
timestampsStringifier,
formatSettings.timestamps,
params
)
write([{ name: 'POSITIONS' }], posNameStringifier)
write(
[...positionsSnapshot, {}],
posStringifier,
formatSettings.positionsSnapshot,
params
)
write(
[{ plUsd: positionsTotalPlUsd }, {}],
positionsTotalPlUsdStringifier,
formatSettings.positionsTotalPlUsd,
params
)
write([{ name: 'WALLETS' }], walletsNameStringifier)
write(
[...walletsSnapshot, {}],
walletsStringifier,
formatSettings.walletsSnapshot,
params
)
write(
[{ balanceUsd: walletsTotalBalanceUsd }, {}],
walletsTotalBalanceUsdStringifier,
formatSettings.walletsTotalBalanceUsd,
params
)
write([{ name: 'POSITIONS TICKERS' }], positionsTickersNameStringifier)
write(
[...positionsTickers, {}],
positionsTickersStringifier,
formatSettings.positionsTickers,
params
)
write([{ name: 'WALLETS TICKERS' }], walletsTickersNameStringifier)
write(
walletsTickers,
walletsTickersStringifier,
formatSettings.walletsTickers,
params
wStream.setMaxListeners(50)

await streamWriter(
wStream,
[
{
columnParams: {
header: true,
columns: columnsCsv.timestamps
},
writeFn: (stream) => write(
[{ mtsCreated, end }, {}],
stream,
formatSettings.timestamps,
params
)
},
{
columnParams: { columns: ['name'] },
writeFn: (stream) => write([{ name: 'POSITIONS' }], stream)
},
{
columnParams: {
header: true,
columns: columnsCsv.positionsSnapshot
},
writeFn: (stream) => write(
[...positionsSnapshot, {}],
stream,
formatSettings.positionsSnapshot,
params
)
},
{
columnParams: {
header: true,
columns: columnsCsv.positionsTotalPlUsd
},
writeFn: (stream) => write(
[{ plUsd: positionsTotalPlUsd }, {}],
stream,
formatSettings.positionsTotalPlUsd,
params
)
},
{
columnParams: { columns: ['name'] },
writeFn: (stream) => write([{ name: 'WALLETS' }], stream)
},
{
columnParams: {
header: true,
columns: columnsCsv.walletsSnapshot
},
writeFn: (stream) => write(
[...walletsSnapshot, {}],
stream,
formatSettings.walletsSnapshot,
params
)
},
{
columnParams: {
header: true,
columns: columnsCsv.walletsTotalBalanceUsd
},
writeFn: (stream) => write(
[{ balanceUsd: walletsTotalBalanceUsd }, {}],
stream,
formatSettings.walletsTotalBalanceUsd,
params
)
},
{
columnParams: { columns: ['name'] },
writeFn: (stream) => write([{ name: 'POSITIONS TICKERS' }], stream)
},
{
columnParams: {
header: true,
columns: columnsCsv.positionsTickers
},
writeFn: (stream) => write(
[...positionsTickers, {}],
stream,
formatSettings.positionsTickers,
params
)
},
{
columnParams: { columns: ['name'] },
writeFn: (stream) => write([{ name: 'WALLETS TICKERS' }], stream)
},
{
columnParams: {
header: true,
columns: columnsCsv.walletsTickers
},
writeFn: (stream) => write(
walletsTickers,
stream,
formatSettings.walletsTickers,
params
)
}
]
)

queue.emit('progress', 100)

posNameStringifier.end()
posStringifier.end()
positionsTotalPlUsdStringifier.end()
walletsNameStringifier.end()
walletsStringifier.end()
walletsTotalBalanceUsdStringifier.end()
positionsTickersNameStringifier.end()
positionsTickersStringifier.end()
walletsTickersNameStringifier.end()
walletsTickersStringifier.end()
}
Loading

0 comments on commit 5411e01

Please sign in to comment.