Skip to content

Commit

Permalink
feat: keep observed domains mapped to entities requested on at least …
Browse files Browse the repository at this point in the history
…50 different pages and generate 2024_07 data

feat: keep saving in file all observed domains with minimum observations

fix: tpw table exists

feat: automated script splitting into 3 steps, add logs and clean table if needed

feat: compute data for 2024_07_01
  • Loading branch information
Guillaume NICOLAS authored and patrickhulce committed Sep 13, 2024
1 parent 527f640 commit eb07e11
Show file tree
Hide file tree
Showing 9 changed files with 49,041 additions and 689 deletions.
1,150 changes: 565 additions & 585 deletions README.md

Large diffs are not rendered by default.

122 changes: 74 additions & 48 deletions bin/automated-update.js
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,9 @@ const childProcess = require('child_process')
const util = require('util')
const {Transform, finished} = require('stream')
const {BigQuery} = require('@google-cloud/bigquery')
const {pipeline} = require('node:stream/promises')

const {getEntity} = require('../lib/')
const {getEntity, entities} = require('../lib/')

const bigQuery = new BigQuery()

Expand Down Expand Up @@ -82,23 +83,25 @@ async function getTargetDatasetDate() {
return {dateStringUnderscore, dateStringHypens}
}

const getQueryResultStream = async query => {
const getQueryResultStream = async (query, params) => {
const [job] = await bigQuery.createQueryJob({
query,
location: 'US',
useQueryCache: false,
params,
})

return job.getQueryResultsStream()
}
const resolveOnFinished = streams => {
const toFinishedPromise = util.promisify(finished)
return Promise.all(streams.map(s => toFinishedPromise(s)))
}
const getJSONStringTransformer = rowCounter => {
const getJSONStringTransformer = (toJSONArrayString = false) => {
let rowCounter = 0
return new Transform({
objectMode: true,
transform(row, _, callback) {
const toJSONArrayString = rowCounter !== undefined
const prefix = toJSONArrayString ? (!rowCounter++ ? '[\n' : ',\n') : ''
const suffix = toJSONArrayString ? '' : '\n'
callback(null, prefix + JSON.stringify(row) + suffix)
Expand All @@ -123,7 +126,7 @@ const getThirdPartyWebTable = async tableName => {
projectId: process.env.OVERRIDE_LH_PROJECT,
})
const thirdPartyWebTable = thirdPartyWebDataset.table(tableName)
const thirdPartyWebTableExits = await thirdPartyWebTable.exists()
const [thirdPartyWebTableExits] = await thirdPartyWebTable.exists()
if (thirdPartyWebTableExits) return thirdPartyWebTable
const [table] = await thirdPartyWebDataset.createTable(tableName, {
schema: [
Expand All @@ -140,6 +143,7 @@ async function main() {

const observedDomainsFilename = `${__dirname}/../data/${dateStringHypens}-observed-domains.json`
const entityScriptingFilename = `${__dirname}/../data/${dateStringHypens}-entity-scripting.json`
const mostObservedDomainsFilename = `${__dirname}/../sql/most-observed-domains-query.sql`
const allObservedDomainsFilename = `${__dirname}/../sql/all-observed-domains-query.sql`
const entityPerPageFilename = `${__dirname}/../sql/entity-per-page.sql`

Expand All @@ -151,94 +155,116 @@ async function main() {
exitFn: () => process.exit(1),
})

const mostObservedDomainsQuery = getQueryForTable(
mostObservedDomainsFilename,
dateStringUnderscore
)
const allObservedDomainsQuery = getQueryForTable(allObservedDomainsFilename, dateStringUnderscore)
const entityPerPageQuery = getQueryForTable(entityPerPageFilename, dateStringUnderscore)

// Create all domains table.
// 1. Get and write in 'observed-domains' json file domains observed more than 50 times
await withExistenceCheck(observedDomainsFilename, {
checkExistenceFn: () => fs.existsSync(observedDomainsFilename),
actionFn: async () => {
console.log(`Start observed domains query`)
console.log(`1️⃣ Start "most observed domains" query`)

const start = Date.now()

const resultsStream = await getQueryResultStream(allObservedDomainsQuery)

// Observed domain json file pipe
let observedDomainsNbRows = 0
const observedDomainsFileWriterStream = fs.createWriteStream(observedDomainsFilename)
resultsStream
const queryResultStream = await getQueryResultStream(mostObservedDomainsQuery)

await pipeline(
queryResultStream,
// stringify observed domain json (with json array prefix based on row index)
.pipe(getJSONStringTransformer(observedDomainsNbRows))
getJSONStringTransformer(true),
// write to observed-domains json file
.pipe(observedDomainsFileWriterStream)
observedDomainsFileWriterStream
)

// Close observed domains json array in file
fs.appendFileSync(observedDomainsFilename, '\n]')

console.log(`✅ Finish "most observed domains" query in ${(Date.now() - start) / 1000}s.`)
},
deleteFn: () => fs.rmSync(observedDomainsFilename),
exitFn: () => {},
})

//2. Get and write in 'third_party_web' table all observed domains mapped to entity observed at least 50 times
await withExistenceCheck(`third_party_web/${dateStringUnderscore}`, {
checkExistenceFn: () =>
bigQuery
.dataset('third_party_web')
.table(dateStringUnderscore)
.exists()
.then(([exists]) => exists),
actionFn: async () => {
console.log(`2️⃣ Start "all observed domains" query`)

// Observed domain entity mapping table pipe
const start = Date.now()

const domainEntityMapping = entities.reduce((array, {name, domains}) => {
return array.concat(domains.map(domain => ({name, domain})))
}, [])
const thirdPartyWebTableWriterStream = await getThirdPartyWebTable(dateStringUnderscore).then(
table =>
table.createWriteStream({
sourceFormat: 'NEWLINE_DELIMITED_JSON',
})
)

resultsStream
const queryResultStream = await getQueryResultStream(allObservedDomainsQuery, {
entities_string: JSON.stringify(domainEntityMapping),
})

await pipeline(
queryResultStream,
// map observed domain to entity
.pipe(EntityCanonicalDomainTransformer)
EntityCanonicalDomainTransformer,
// stringify json with new line delimiter
.pipe(getJSONStringTransformer())
getJSONStringTransformer(),
// write to thrid_party_web table
.pipe(thirdPartyWebTableWriterStream)

// Wait both streams to finish
await resolveOnFinished([observedDomainsFileWriterStream, thirdPartyWebTableWriterStream])

// Close observed domains json array in file
fs.appendFileSync(observedDomainsFilename, '\n]')

console.log(
`Finish query in ${(Date.now() - start) / 1000}s. Wrote ${observedDomainsNbRows} rows.`
thirdPartyWebTableWriterStream
)

console.log(`✅ Finish query in ${(Date.now() - start) / 1000}s.`)
},
deleteFn: async () => {
await bigQuery
deleteFn: () =>
bigQuery
.dataset('third_party_web')
.table(dateStringUnderscore)
.delete()
.catch(() => {})
},
.catch(err => {
console.error('could not delete third_party_web table', err)
}),
exitFn: () => {},
})

// Run entity scripting query.
//3. Run entity scripting query.
await withExistenceCheck(entityScriptingFilename, {
checkExistenceFn: () => fs.existsSync(entityScriptingFilename),
actionFn: async () => {
console.log(`Start entity scripting query`)
console.log(`3️⃣ Start entity scripting query`)

const start = Date.now()

const resultsStream = await getQueryResultStream(entityPerPageQuery)

// Entity scripting json file pipe
let entityScriptingNbRows = 0
const entityScriptingFileWriterStream = fs.createWriteStream(entityScriptingFilename)
resultsStream
const queryResultsStream = await getQueryResultStream(entityPerPageQuery)

await pipeline(
queryResultsStream,
// stringify entity scripting json (with json array prefix based on row index)
.pipe(getJSONStringTransformer(entityScriptingNbRows))
getJSONStringTransformer(true),
// write to entity-scripting json file
.pipe(entityScriptingFileWriterStream)

// Wait stream to finish
await resolveOnFinished([entityScriptingFileWriterStream])

console.log(
`Finish query in ${(Date.now() - start) / 1000}s. Wrote ${entityScriptingNbRows} rows.`
entityScriptingFileWriterStream
)

console.log(`✅ Finish query in ${(Date.now() - start) / 1000}s.`)

// Close observed domains json array in file
fs.appendFileSync(entityScriptingFilename, ']')
},
deleteFn: () => {},
deleteFn: () => fs.rmSync(entityScriptingFilename),
exitFn: () => {},
})

Expand Down
Binary file modified by-category.png
Loading
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Loading

0 comments on commit eb07e11

Please sign in to comment.