Skip to content

Commit

Permalink
refactoring tasks and util
Browse files Browse the repository at this point in the history
  • Loading branch information
TrevorJTClarke committed Jan 19, 2022
1 parent df79193 commit a352610
Show file tree
Hide file tree
Showing 4 changed files with 109 additions and 91 deletions.
31 changes: 9 additions & 22 deletions src/agent.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,9 @@
require('dotenv').config()
import * as config from './configuration'
import { utils } from 'near-api-js'
import Big from 'big.js'
import chalk from 'chalk'

const log = console.log
export const env = process.env.NODE_ENV || 'development'
export const near_env = process.env.NEAR_ENV || 'testnet'
export const LOG_LEVEL = process.env.LOG_LEVEL || 'info'
export const WAIT_INTERVAL_MS = process.env.WAIT_INTERVAL_MS ? parseInt(`${process.env.WAIT_INTERVAL_MS}`) : 30000
export const AGENT_ACCOUNT_ID = process.env.AGENT_ACCOUNT_ID || 'croncat-agent'
export const AGENT_MIN_TASK_BALANCE = utils.format.parseNearAmount(`${process.env.AGENT_MIN_TASK_BALANCE || '1'}`) // Default: 1_000_000_000_000_000_000_000_000 (1 NEAR)
export const AGENT_AUTO_REFILL = process.env.AGENT_AUTO_REFILL === 'true' ? true : false
export const AGENT_AUTO_RE_REGISTER = process.env.AGENT_AUTO_RE_REGISTER === 'true' ? true : false
export const BASE_GAS_FEE = 300000000000000
export const BASE_ATTACHED_PAYMENT = 0
export const BASE_REGISTER_AGENT_FEE = '4840000000000000000000'
let agentSettings = {}
let croncatSettings = {}

let cronManager = null
let agentAccount = null

export async function getAgentBalance() {
Expand Down Expand Up @@ -54,13 +39,13 @@ export async function registerAgent(agentId, payable_account_id, options) {
}
}

export async function getAgent(agentId, options) {
const manager = await getCronManager(null, options)
export async function getAgent(agentId) {
const manager = await getCronManager()
try {
const res = await manager.get_agent({ account_id: agentId || agentAccount })
const res = await manager.get_agent({ account_id: agentId })
return res
} catch (ge) {
if (LOG_LEVEL === 'debug') console.log(ge);
if (config.LOG_LEVEL === 'debug') console.log(ge);
}
}

Expand Down Expand Up @@ -119,12 +104,12 @@ export async function refillAgentTaskBalance(options) {
}
}


// Initialize the agent & all configs, returns TRUE if agent is active
export async function bootstrap(agentId, options) {
await connect(options)

// 1. Check for local signing keys, if none - generate new and halt until funded
agentAccount = `${await Near.getAccountCredentials(agentId || AGENT_ACCOUNT_ID)}`
agentAccount = `${await Near.getAccountCredentials(agentId || config.AGENT_ACCOUNT_ID)}`

// 2. Check for balance, if enough to execute txns, start main tasks
await checkAgentBalance(agentId)
Expand All @@ -148,4 +133,6 @@ export async function bootstrap(agentId, options) {
await registerAgent(agentId)
} else log(`No Agent: ${chalk.gray('Please register')}`)
}

return agentSettings ? true : false
}
7 changes: 4 additions & 3 deletions src/index.js
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,15 @@ import { bootstrap } from './agent'
import * as tasks from './tasks'
import * as triggers from './triggers'

// Cron Agent Task Loop
// Cron Agent Loops
(async () => {
const isActive = await bootstrap()

// TODO: Change agent register flow, to be in agent!
// agent.run()
// MAIN LOOPs
tasks.run()

// TODO: Move the following into agent?
if (isActive) tasks.run()
// TODO: Only run if agent bootstrap reveals agent active
if (config.BETA_FEATURES && isActive) triggers.run()
})()
160 changes: 95 additions & 65 deletions src/tasks.js
Original file line number Diff line number Diff line change
@@ -1,24 +1,9 @@
require('dotenv').config()
const contractAbi = require('../src/contract_abi.json')
import * as config from './configuration'
import * as agent from './agent'
import * as util from './util'
import { utils } from 'near-api-js'
import axios from 'axios'
import Big from 'big.js'
import NearProvider from './near'
import chalk from 'chalk'
import slack from './slack'

const log = console.log
export const env = process.env.NODE_ENV || 'development'
export const near_env = process.env.NEAR_ENV || 'testnet'
export const LOG_LEVEL = process.env.LOG_LEVEL || 'info'
export const WAIT_INTERVAL_MS = process.env.WAIT_INTERVAL_MS ? parseInt(`${process.env.WAIT_INTERVAL_MS}`) : 30000
export const AGENT_ACCOUNT_ID = process.env.AGENT_ACCOUNT_ID || 'croncat-agent'
export const AGENT_MIN_TASK_BALANCE = utils.format.parseNearAmount(`${process.env.AGENT_MIN_TASK_BALANCE || '1'}`) // Default: 1_000_000_000_000_000_000_000_000 (1 NEAR)
export const AGENT_AUTO_REFILL = process.env.AGENT_AUTO_REFILL === 'true' ? true : false
export const AGENT_AUTO_RE_REGISTER = process.env.AGENT_AUTO_RE_REGISTER === 'true' ? true : false
export const BASE_GAS_FEE = 300000000000000
export const BASE_ATTACHED_PAYMENT = 0
export const BASE_REGISTER_AGENT_FEE = '4840000000000000000000'

let agentSettings = {}
let croncatSettings = {}
let agentBalanceCheckIdx = 0
Expand Down Expand Up @@ -91,61 +76,77 @@ export async function rpcFunction(method, args, isView, gas = BASE_GAS_FEE, amou
}
}

export async function runTaskTick(options = {}) {
const manager = await getCronManager(null, options)
const agentId = options.accountId || options.account_id
let skipThisIteration = false
let totalTasks = 0
let previousAgentSettings = {...agentSettings}

export const pingAgentBalance = async () => {
// Logic will trigger on initial run, then every 5th txn
// NOTE: This is really only useful if the payout account is the same as the agent
if (AGENT_AUTO_REFILL && agentBalanceCheckIdx === 0) {
await checkAgentTaskBalance(options)
if (config.AGENT_AUTO_REFILL && agentBalanceCheckIdx === 0) {
await agent.checkAgentTaskBalance()

// Always ping heartbeat here, checks prefs above
await pingHeartbeat()
// Always ping heartbeat here, checks config
await util.pingHeartbeat()
}
agentBalanceCheckIdx++
if (agentBalanceCheckIdx > 5) agentBalanceCheckIdx = 0
}

// 1. Check for tasks
// returns if agent should skip next call or not
export const getTasks = async () => {
const manager = await util.getCronManager()
const agentId = agent.accountId()
let skipThisIteration = false
let totalTasks = 0
let taskRes

try {
// Only get task hashes my agent can execute
taskRes = await manager.get_agent_tasks({ account_id: agentId })
} catch (e) {
log(`${chalk.red('Connection interrupted, trying again soon...')}`)
if (LOG_LEVEL === 'debug') console.log('rpcFunction', e);
console.log(`${chalk.red('Connection interrupted, trying again soon...')}`)
if (config.LOG_LEVEL === 'debug') console.log('getTasks', e);
// Wait, then try loop again.
setTimeout(() => { runTaskTick(options) }, WAIT_INTERVAL_MS)
skipThisIteration = true
return;
}
totalTasks = parseInt(taskRes[0])
if (taskRes[1] === '0') log(`${chalk.gray(new Date().toISOString())} Available Tasks: ${chalk.red(totalTasks)}, Current Slot: ${chalk.red('Paused')}`)
else log(`${chalk.gray(new Date().toISOString())} ${chalk.gray('[' + options.networkId.toUpperCase() + ']')} Available Tasks: ${chalk.blueBright(totalTasks)}, Current Slot: ${chalk.yellow(taskRes[1])}`)
if (taskRes[1] === '0') console.log(`${chalk.gray(new Date().toISOString())} Available Tasks: ${chalk.red(totalTasks)}, Current Slot: ${chalk.red('Paused')}`)
else console.log(`${chalk.gray(new Date().toISOString())} ${chalk.gray('[' + options.networkId.toUpperCase() + ']')} Available Tasks: ${chalk.blueBright(totalTasks)}, Current Slot: ${chalk.yellow(taskRes[1])}`)

if (LOG_LEVEL === 'debug') console.log('taskRes', taskRes)
if (config.LOG_LEVEL === 'debug') console.log('taskRes', taskRes)
if (totalTasks <= 0) skipThisIteration = true

return skipThisIteration
}

// Checks if need to re-register agent based on tasks getting missed
export const reRegisterAgent = async () => {
if (!config.AGENT_AUTO_RE_REGISTER) process.exit(1)
await agent.reRegister()
}

// returns if agent should skip next call or not
// TODO: Check if this logic is always needed, or can be cached?
export const checkAgent = async () => {
let skipThisIteration = false
let previousAgentSettings = { ...agentSettings }

try {
agentSettings = await getAgent(agentId)
agentSettings = await agent.getAgent()
} catch (ae) {
agentSettings = {}
// if no status, trigger a delayed retry
setTimeout(() => { runTaskTick(options) }, WAIT_INTERVAL_MS)
return;
skipThisIteration = true
return skipThisIteration
}
// Check agent is active & able to run tasks
if (!agentSettings || !agentSettings.status || agentSettings.status !== 'Active') {
log(`Agent Status: ${chalk.white(agentSettings.status)}`)
console.log(`Agent Status: ${chalk.white(agentSettings.status)}`)
skipThisIteration = true
}

// Alert if agent changes status:
if (previousAgentSettings.status !== agentSettings.status) {
log(`Agent Status: ${chalk.white(agentSettings.status)}`)
await notifySlack(`*Agent Status Update:*\nYour agent is now a status of *${agentSettings.status}*`)
console.log(`Agent Status: ${chalk.white(agentSettings.status)}`)
await util.notifySlack(`*Agent Status Update:*\nYour agent is now a status of *${agentSettings.status}*`)

// TODO: At this point we could check if we need to re-register the agent if enough remaining balance, and status went from active to pending or none.
// NOTE: For now, stopping the process if no agent settings.
Expand All @@ -156,36 +157,65 @@ export async function runTaskTick(options = {}) {
let last_missed_slot = agentSettings.last_missed_slot;
if (last_missed_slot !== 0) {
if (last_missed_slot > (parseInt(taskRes[1]) + (croncatSettings.agents_eject_threshold * croncatSettings.slot_granularity))) {
log(`${chalk.red('Agent has been ejected! Too many slots missed!')}`)
await notifySlack(`*Agent has been ejected! Too many slots missed!*`)
const ejectMsg = 'Agent has been ejected! Too many slots missed!'
console.log(`${chalk.red(ejectMsg)}`)
await util.notifySlack(`*${ejectMsg}*`)
// TODO: Assess if re-register
process.exit(1);
}
}

// 2. Sign task and submit to chain
if (!skipThisIteration) {
try {
const res = await manager.proxy_call({
args: {},
gas: BASE_GAS_FEE,
amount: BASE_ATTACHED_PAYMENT,
})
if (LOG_LEVEL === 'debug') console.log(res)
// log(`${chalk.yellowBright('TX:' + res.transaction_outcome.id)}`)
} catch (e) {
if (LOG_LEVEL === 'debug') console.log('proxy_call issue', e)
// Check if the agent should slow down to wait for next slot opportunity
if (e.type && e.type === 'FunctionCallError') {
// Check if we need to skip iteration based on max calls in this slot, so we dont waste more fees.
if (e.kind.ExecutionError.search('Agent has exceeded execution for this slot') > -1) {
skipThisIteration = true
}
return skipThisIteration
}

// returns if agent should skip next call or not
export const proxyCall = async () => {
const manager = await util.getCronManager()
let skipThisIteration = false

try {
const res = await manager.proxy_call({
args: {},
gas: BASE_GAS_FEE,
amount: BASE_ATTACHED_PAYMENT,
})
if (config.LOG_LEVEL === 'debug') console.log(res)
if (config.LOG_LEVEL === 'debug') console.log(`${chalk.yellowBright('TX:' + res.transaction_outcome.id)}`)
} catch (e) {
if (config.LOG_LEVEL === 'debug') console.log('proxy_call issue', e)
// Check if the agent should slow down to wait for next slot opportunity
if (e.type && e.type === 'FunctionCallError') {
// Check if we need to skip iteration based on max calls in this slot, so we dont waste more fees.
if (e.kind.ExecutionError.search('Agent has exceeded execution for this slot') > -1) {
skipThisIteration = true
}
}
}

// Wait, then loop again.
return skipThisIteration
}

export async function run() {
let skipThisIteration = false

// 0. Check balance & ping
await pingAgentBalance()

// 1. Check for tasks
skipThisIteration = await getTasks()
if (skipThisIteration) return setTimeout(run, WAIT_INTERVAL_MS)

// 2. Check agent status
skipThisIteration = await checkAgent()
if (skipThisIteration) return setTimeout(run, WAIT_INTERVAL_MS)

// 3. Sign task and submit to chain
if (!skipThisIteration) {
skipThisIteration = await proxyCall()
}

// 4. Wait, then loop again.
// Run immediately if executed tasks remain for this slot, then sleep until next slot.
const nextAttemptInterval = skipThisIteration ? WAIT_INTERVAL_MS : 100
setTimeout(() => { runTaskTick(options) }, nextAttemptInterval)
setTimeout(run, nextAttemptInterval)
}
2 changes: 1 addition & 1 deletion src/util.js
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
require('dotenv').config()
import * as config from './configuration'
const contractAbi = require('./contract_abi.json')
import { utils } from 'near-api-js'
import axios from 'axios'
Expand Down

0 comments on commit a352610

Please sign in to comment.