Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Reduce rpc calls in updateSignerPenAndStatus (resolve #801) #802

Merged
merged 2 commits into from
Dec 27, 2023
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
262 changes: 168 additions & 94 deletions crawl.js
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ let cpValidator = 0

let tweetedMN = ''

async function watchValidator () {
async function watchValidator() {
var blockNumber = cpValidator || await web3.eth.getBlockNumber()
try {
blockNumber = blockNumber || await web3.eth.getBlockNumber()
Expand Down Expand Up @@ -160,7 +160,7 @@ async function watchValidator () {
}
}

async function updateCandidateInfo (candidate) {
async function updateCandidateInfo(candidate) {
try {
let capacity = await validator.methods.getCandidateCap(candidate).call()
let owner = (await validator.methods.getCandidateOwner(candidate).call() || '').toLowerCase()
Expand Down Expand Up @@ -206,7 +206,7 @@ async function updateCandidateInfo (candidate) {
}
}

async function updateVoterCap (candidate, voter) {
async function updateVoterCap(candidate, voter) {
try {
let capacity = await validator.methods.getVoterCap(candidate, voter).call()
logger.debug('Update voter %s for candidate %s capacity %s', voter, candidate, String(capacity))
Expand All @@ -229,7 +229,7 @@ async function updateVoterCap (candidate, voter) {
}

// Get current candates
async function getCurrentCandidates () {
async function getCurrentCandidates() {
try {
let candidates = await validator.methods.getCandidates().call()
let candidatesInDb = await db.Candidate.find({
Expand Down Expand Up @@ -257,115 +257,189 @@ async function getCurrentCandidates () {
}
}

async function updateSignerPenAndStatus () {
async function updateSignerPenAndStatus() {
try {
const latestBlockNumber = await web3.eth.getBlockNumber()
const latestCheckpoint = latestBlockNumber - (latestBlockNumber % parseInt(config.get('blockchain.epoch')))
const currentEpoch = (parseInt(latestCheckpoint / config.get('blockchain.epoch')) + 1).toString()
const blk = await web3.eth.getBlock(latestCheckpoint)
const signers = []
const penalties = []
let candidateBulkOps = []
let statusBulkOps = []

// get candidate list
const candidates = await db.Candidate.find({
smartContractAddress: config.get('blockchain.validatorAddress'),
candidate: {
$ne: 'RESIGNED'
}
})
// loop and get status
await Promise.all(candidates.map(async (c) => {
const data = {
'jsonrpc': '2.0',
'method': 'eth_getCandidateStatus',
'params': [c.candidate.toLowerCase(), 'latest'],
'id': config.get('blockchain.networkId')
},
{
candidate: 1,
status: 1
}
const response = await axios.post(config.get('blockchain.rpc'), data)

if (response.data) {
const result = (response.data.result || {}).status
switch (result) {
case 'MASTERNODE':
signers.push(c.candidate)
await db.Candidate.findOneAndUpdate({
smartContractAddress: config.get('blockchain.validatorAddress'),
candidate: c.candidate.toLowerCase()
}, {
$set: {
status: 'MASTERNODE'
}
}, { upsert: true })
await db.Status.findOneAndUpdate({ epoch: currentEpoch, candidate: c.candidate }, {
epoch: currentEpoch,
candidate: c.candidate,
status: 'MASTERNODE',
epochCreatedAt: moment.unix(blk.timestamp).utc()
}, { upsert: true })
break
case 'SLASHED':
logger.info('Update candidate %s slashed at blockNumber %s', c.candidate, String(blk.number))
// fireNotification
if (result.toLowerCase() !== c.status.toLowerCase()) {
// get all voters who have capacity > 0
const voters = await db.Voter.find({
candidate: c.candidate,
smartContractAddress: config.get('blockchain.validatorAddress'),
capacityNumber: { $gt: 0 }
)
const onchainCandidateParams = {
'jsonrpc': '2.0',
'method': 'eth_getCandidates',
'params': ['latest'],
'id': config.get('blockchain.networkId')
}
const response = await axios.post(config.get('blockchain.rpc'), onchainCandidateParams)
if (response.data && response.data.result) {
const allCandidates = response.data.result.candidates
if (!allCandidates || allCandidates.length == 0) {
logger.error('no onchain candidate found')
return
}
logger.info(`Onchain candidates length`, Object.keys(allCandidates).length)
logger.info(`Database candidates candidates`, candidates.length)
for (const c of candidates) {
const thisCandidate = allCandidates[ethUtils.toChecksumAddress(c.candidate)]
if (!thisCandidate) {
continue
}
switch (thisCandidate.status) {
case 'MASTERNODE':
signers.push(c.candidate)
candidateBulkOps.push({
updateOne: {
filter: {
smartContractAddress: config.get('blockchain.validatorAddress'),
candidate: c.candidate.toLowerCase()
},
update: {
$set: {
status: 'MASTERNODE'
}
},
upsert: true
}
})
if (voters && voters.length > 0) {
await Promise.all(voters.map(async (v) => {
await fireNotification(v.voter, c.candidate, c.name, 'Slash', latestBlockNumber)
}))
statusBulkOps.push({
updateOne: {
filter: {
epoch: currentEpoch, candidate: c.candidate
},
update: {
$set: {
epoch: currentEpoch,
candidate: c.candidate,
status: 'MASTERNODE',
epochCreatedAt: moment.unix(blk.timestamp).utc()
}
},
upsert: true
}
})
break
case 'SLASHED':
logger.info('Update candidate %s slashed at blockNumber %s', c.candidate, String(blk.number))
// fireNotification
if (thisCandidate.status.toLowerCase() !== c.status.toLowerCase()) {
// get all voters who have capacity > 0
const voters = await db.Voter.find({
candidate: c.candidate,
smartContractAddress: config.get('blockchain.validatorAddress'),
capacityNumber: { $gt: 0 }
})
if (voters && voters.length > 0) {
await Promise.all(voters.map(async (v) => {
await fireNotification(v.voter, c.candidate, c.name, 'Slash', latestBlockNumber)
}))
}
}
}
penalties.push(c.candidate)

candidateBulkOps.push({
updateOne: {
filter: {
smartContractAddress: config.get('blockchain.validatorAddress'),
candidate: c.candidate.toLowerCase()
},
update: {
$set: {
status: 'SLASHED'
}
},
upsert: true
}
})

db.Candidate.findOneAndUpdate({
smartContractAddress: config.get('blockchain.validatorAddress'),
candidate: c.candidate.toLowerCase()
}, {
$set: {
status: 'SLASHED'
}
}, { upsert: true }).then(() => true)
.catch(error => console.log(error))
statusBulkOps.push({
updateOne: {
filter: {
epoch: currentEpoch, candidate: c.candidate
},
update: {
$set: {
epoch: currentEpoch,
candidate: c.candidate,
status: 'SLASHED',
epochCreatedAt: moment.unix(blk.timestamp).utc()
}
},
upsert: true
}
})
break
case 'PROPOSED':
candidateBulkOps.push({
updateOne: {
filter: {
smartContractAddress: config.get('blockchain.validatorAddress'),
candidate: c.candidate.toLowerCase()
},
update: {
$set: {
status: 'PROPOSED'
}
},
upsert: true
}
})

statusBulkOps.push({
updateOne: {
filter: {
epoch: currentEpoch, candidate: c.candidate
},
update: {
$set: {
epoch: currentEpoch,
candidate: c.candidate,
status: 'PROPOSED',
epochCreatedAt: moment.unix(blk.timestamp).utc()
}
},
upsert: true
}
})
default:
break

db.Status.findOneAndUpdate({ epoch: currentEpoch, candidate: c.candidate }, {
epoch: currentEpoch,
candidate: c.candidate,
status: 'SLASHED',
epochCreatedAt: moment.unix(blk.timestamp).utc()
}, { upsert: true }).then(() => true)
.catch(error => console.log(error))
penalties.push(c.candidate)
break
case 'PROPOSED':
await db.Candidate.findOneAndUpdate({
smartContractAddress: config.get('blockchain.validatorAddress'),
candidate: c.candidate.toLowerCase()
}, {
$set: {
status: 'PROPOSED'
}
}, { upsert: true })
await db.Status.findOneAndUpdate({ epoch: currentEpoch, candidate: c.candidate }, {
epoch: currentEpoch,
candidate: c.candidate,
status: 'PROPOSED',
epochCreatedAt: moment.unix(blk.timestamp).utc()
}, { upsert: true })
break
default:
break
}
}
}))

}

if (candidateBulkOps.length > 0) {
const res = await db.Candidate.collection.bulkWrite(candidateBulkOps)
logger.debug(`Update candidates at block ${blk.number}, result ${JSON.stringify(res)}`)
}
if (statusBulkOps.length > 0) {
const res = await db.Status.collection.bulkWrite(statusBulkOps)
logger.debug(`Update statuses at block ${blk.number}, result ${JSON.stringify(res)}`)
}

await db.Signer.findOneAndUpdate({ blockNumber: blk.number }, {
networkId: config.get('blockchain.networkId'),
blockNumber: blk.number,
signers: signers
}, { upsert: true })

await db.Penalty.update({ epoch: currentEpoch }, {
await db.Penalty.updateOne({ epoch: currentEpoch }, {
networkId: config.get('blockchain.networkId'),
blockNumber: blk.number,
epoch: currentEpoch,
Expand All @@ -381,7 +455,7 @@ async function updateSignerPenAndStatus () {
}

let sleep = (time) => new Promise((resolve) => setTimeout(resolve, time))
async function watchNewBlock (n) {
async function watchNewBlock(n) {
try {
let blockNumber = await web3.eth.getBlockNumber()
n = n || blockNumber
Expand Down Expand Up @@ -505,7 +579,7 @@ async function watchNewBlock (n) {
return watchNewBlock(n)
}

async function fireNotification (voter, candidate, name, event, blockNumber, amount = '') {
async function fireNotification(voter, candidate, name, event, blockNumber, amount = '') {
try {
const isRead = false
await db.Notification.findOneAndUpdate({
Expand All @@ -526,7 +600,7 @@ async function fireNotification (voter, candidate, name, event, blockNumber, amo
}
}

function diff (a, b) {
function diff(a, b) {
return a.filter((i) => {
return b.indexOf(i) < 0
})
Expand Down Expand Up @@ -561,11 +635,11 @@ const getBlockSigners = async (number) => {
return []
}

async function updateLatestSignedBlock (blk) {
async function updateLatestSignedBlock(blk) {
try {
if (!blk ||
blk.number % parseInt(config.get('blockchain.blockSignerGap')) !==
parseInt(config.get('blockchain.blockSignerDelay'))
parseInt(config.get('blockchain.blockSignerDelay'))
) {
return
}
Expand All @@ -590,14 +664,14 @@ async function updateLatestSignedBlock (blk) {

if (bulkOps.length > 0) {
const res = await db.Candidate.collection.bulkWrite(bulkOps)
logger.debug(`UpdatelatestSignedBlock at block ${blk.number}, result ${res}`)
logger.debug(`UpdatelatestSignedBlock at block ${blk.number}, result ${JSON.stringify(res)}`)
}
} catch (e) {
logger.error('updateLatestSignedBlock %s', e)
}
}

async function getPastEvent () {
async function getPastEvent() {
let blockNumber = await web3.eth.getBlockNumber()
let lastBlockTx = await db.Transaction.findOne().sort({ blockNumber: -1 })
let lb = (lastBlockTx && lastBlockTx.blockNumber) ? lastBlockTx.blockNumber : 0
Expand Down