diff --git a/Cargo.lock b/Cargo.lock index ff9c07f7..ce42357c 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -155,7 +155,7 @@ dependencies = [ "css-inline", "db", "drop-tracer", - "futures 0.3.25", + "futures", "futures-util", "geoip", "hex", @@ -191,7 +191,7 @@ dependencies = [ "static_init", "test-util", "thiserror", - "time 0.3.28", + "time", "tokio", "tokio-stream", "ts-rs", @@ -627,7 +627,7 @@ dependencies = [ "serde", "serde_bytes", "serde_json", - "time 0.3.28", + "time", "uuid", ] @@ -781,23 +781,6 @@ dependencies = [ "os_str_bytes", ] -[[package]] -name = "client" -version = "0.1.0" -dependencies = [ - "bytes", - "dotenv", - "ffmpeg", - "futures 0.3.25", - "hyper 0.14.27", - "jemallocator", - "lazy_static", - "rand 0.8.5", - "reqwest", - "static_init", - "tokio", -] - [[package]] name = "codespan-reporting" version = "0.11.1" @@ -872,7 +855,7 @@ dependencies = [ "console-api", "crossbeam-channel", "crossbeam-utils", - "futures 0.3.25", + "futures", "hdrhistogram", "humantime", "parking_lot 0.12.1", @@ -947,7 +930,7 @@ dependencies = [ "rand 0.8.5", "sha2", "subtle", - "time 0.3.28", + "time", "version_check", ] @@ -975,58 +958,12 @@ dependencies = [ "cfg-if", ] -[[package]] -name = "create-random-stream-connections" -version = "0.1.0" -dependencies = [ - "anyhow", - "config", - "db", - "futures 0.3.25", - "futures-util", - "geoip", - "log", - "logger", - "mongodb", - "owo-colors 3.5.0", - "rand 0.8.5", - "serde-util", - "thiserror", - "time 0.3.28", - "tokio", - "user-agent", -] - -[[package]] -name = "create-test-accounts" -version = "0.1.0" -dependencies = [ - "anyhow", - "config", - "db", - "futures 0.3.25", - "futures-util", - "log", - "logger", - "mongodb", - "owo-colors 3.5.0", - "serde-util", - "thiserror", - "tokio", -] - [[package]] name = "critical-section" version = "1.1.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "6548a0ad5d2549e111e1f6a11a6c2e2d00ce6a3dafe22948d67c2b443f775e52" -[[package]] -name = "crossbeam" -version = "0.2.12" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "bd66663db5a988098a89599d4857919b3acf7f61402e61365acfd3919857b9be" - [[package]] name = "crossbeam-channel" version = "0.5.7" @@ -1135,7 +1072,7 @@ dependencies = [ "memchr", "pico-args", "rayon", - "smallvec 1.10.0", + "smallvec", "url", ] @@ -1158,7 +1095,7 @@ dependencies = [ "phf 0.8.0", "proc-macro2", "quote", - "smallvec 1.10.0", + "smallvec", "syn 1.0.107", ] @@ -1175,7 +1112,7 @@ dependencies = [ "phf 0.10.1", "proc-macro2", "quote", - "smallvec 1.10.0", + "smallvec", "syn 1.0.107", ] @@ -1364,7 +1301,7 @@ dependencies = [ "static_init", "strum", "thiserror", - "time 0.3.28", + "time", "tokio", "ts-rs", "uid", @@ -1752,7 +1689,7 @@ dependencies = [ "lebe", "miniz_oxide 0.7.1", "rayon-core", - "smallvec 1.10.0", + "smallvec", "zune-inflate", ] @@ -1808,14 +1745,6 @@ dependencies = [ "url", ] -[[package]] -name = "ffstress" -version = "0.1.0" -dependencies = [ - "ffmpeg", - "tokio", -] - [[package]] name = "flate2" version = "1.0.25" @@ -1882,12 +1811,6 @@ version = "1.2.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "2022715d62ab30faffd124d40b76f4134a550a87792276512b18d63272333394" -[[package]] -name = "fuchsia-cprng" -version = "0.1.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "a06f77d526c1a601b7c4cdd98f54b5eaabffc14d5f2f0296febdc7f357c6d3ba" - [[package]] name = "funty" version = "2.0.0" @@ -1904,12 +1827,6 @@ dependencies = [ "new_debug_unreachable", ] -[[package]] -name = "futures" -version = "0.1.31" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3a471a38ef8ed83cd6e40aa59c1ffe17db6855c18e3604d9c4ed8c08ebc28678" - [[package]] name = "futures" version = "0.3.25" @@ -2293,17 +2210,6 @@ dependencies = [ "hmac", ] -[[package]] -name = "hls-logger" -version = "0.1.0" -dependencies = [ - "hyper 0.14.27", - "log", - "logger", - "prex", - "tokio", -] - [[package]] name = "hmac" version = "0.12.1" @@ -2830,7 +2736,7 @@ checksum = "15d30b9b346f215bebb2683e1f0947c49411fc488a41b05521a1fc3aa825f7e9" dependencies = [ "generic-array", "serde", - "time 0.3.28", + "time", ] [[package]] @@ -3184,7 +3090,7 @@ dependencies = [ "log", "owo-colors 3.5.0", "static_init", - "time 0.3.28", + "time", ] [[package]] @@ -3299,12 +3205,6 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "b87248edafb776e59e6ee64a79086f65890d3510f2c656c000bf2a7e8a0aea40" -[[package]] -name = "maybe-uninit" -version = "2.0.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60302e4db3a61da70c0cb7991976248362f30319e88850c487b9b95bbf059e00" - [[package]] name = "md-5" version = "0.10.5" @@ -3337,7 +3237,7 @@ dependencies = [ "shutdown", "stream-util", "thiserror", - "time 0.3.28", + "time", "tokio", "tokio-stream", "url", @@ -3409,26 +3309,6 @@ dependencies = [ "syn 2.0.48", ] -[[package]] -name = "migrations" -version = "0.1.0" -dependencies = [ - "anyhow", - "config", - "crypt", - "db", - "dotenv", - "futures-util", - "log", - "logger", - "macros", - "mongodb", - "owo-colors 3.5.0", - "serde", - "serde-util", - "tokio", -] - [[package]] name = "mime" version = "0.3.16" @@ -3481,17 +3361,6 @@ dependencies = [ "windows-sys 0.48.0", ] -[[package]] -name = "mixed" -version = "0.1.0" -dependencies = [ - "bytes", - "ffmpeg", - "hyper 0.14.27", - "prex", - "tokio", -] - [[package]] name = "modify" version = "1.3.0" @@ -3593,19 +3462,6 @@ dependencies = [ "static_assertions", ] -[[package]] -name = "multiqueue" -version = "0.3.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4059673f3516669cbf7ebb448cb37171559ed22e6d8bc79cf0cf9394cf9e73fd" -dependencies = [ - "crossbeam", - "futures 0.1.31", - "parking_lot 0.3.8", - "smallvec 0.3.4", - "time 0.1.45", -] - [[package]] name = "nanohtml2text" version = "0.1.4" @@ -3782,7 +3638,7 @@ dependencies = [ "dialoguer", "dotenv", "drop-tracer", - "futures 0.3.25", + "futures", "hyper 0.14.27", "jemallocator", "local-ip-address", @@ -3816,12 +3672,6 @@ version = "6.4.1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "9b7820b9daea5457c9f21c69448905d723fbd21136ccf521748f23fd49e723ee" -[[package]] -name = "owning_ref" -version = "0.2.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9d52571ddcb42e9c900c901a18d8d67e393df723fcd51dd59c5b1a85d0acb6cc" - [[package]] name = "owo-colors" version = "3.5.0" @@ -3835,17 +3685,6 @@ version = "3.5.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "c1b04fb49957986fdce4d6ee7a65027d55d4b6d2265e5848bbb507b58ccfdb6f" -[[package]] -name = "parking_lot" -version = "0.3.8" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "fa12d706797d42551663426a45e2db2e0364bd1dbf6aeada87e89c5f981f43e9" -dependencies = [ - "owning_ref", - "parking_lot_core 0.2.14", - "thread-id", -] - [[package]] name = "parking_lot" version = "0.11.2" @@ -3867,18 +3706,6 @@ dependencies = [ "parking_lot_core 0.9.5", ] -[[package]] -name = "parking_lot_core" -version = "0.2.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "4db1a8ccf734a7bce794cc19b3df06ed87ab2f3907036b693c68f56b4d4537fa" -dependencies = [ - "libc 0.2.147", - "rand 0.4.6", - "smallvec 0.6.14", - "winapi", -] - [[package]] name = "parking_lot_core" version = "0.8.6" @@ -3889,7 +3716,7 @@ dependencies = [ "instant", "libc 0.2.147", "redox_syscall 0.2.16", - "smallvec 1.10.0", + "smallvec", "winapi", ] @@ -3902,7 +3729,7 @@ dependencies = [ "cfg-if", "libc 0.2.147", "redox_syscall 0.2.16", - "smallvec 1.10.0", + "smallvec", "windows-sys 0.42.0", ] @@ -4166,7 +3993,7 @@ dependencies = [ "async-trait", "bytes", "constants", - "futures 0.3.25", + "futures", "http-auth-basic", "hyper 0.14.27", "hyper-tungstenite", @@ -4233,27 +4060,6 @@ dependencies = [ "unicode-ident", ] -[[package]] -name = "producer" -version = "0.1.0" -dependencies = [ - "api", - "async-stream", - "bytes", - "constants", - "db", - "dotenv", - "ffmpeg", - "futures 0.1.31", - "hyper 0.14.27", - "multiqueue", - "reqwest", - "serde_json", - "stream-util", - "thiserror", - "tokio", -] - [[package]] name = "prost" version = "0.11.9" @@ -4293,26 +4099,6 @@ dependencies = [ "lazy-regex", ] -[[package]] -name = "purge-deleted-items" -version = "0.1.0" -dependencies = [ - "anyhow", - "config", - "crypt", - "db", - "dotenv", - "futures-util", - "log", - "logger", - "macros", - "mongodb", - "owo-colors 3.5.0", - "serde", - "serde-util", - "tokio", -] - [[package]] name = "qoi" version = "0.4.1" @@ -4358,19 +4144,6 @@ version = "0.7.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "dc33ff2d4973d518d823d61aa239014831e521c75da58e3df4840d3f47749d09" -[[package]] -name = "rand" -version = "0.4.6" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "552840b97013b1a26992c11eac34bdd778e464601a4c2054b5f0bff7c6761293" -dependencies = [ - "fuchsia-cprng", - "libc 0.2.147", - "rand_core 0.3.1", - "rdrand", - "winapi", -] - [[package]] name = "rand" version = "0.7.3" @@ -4416,21 +4189,6 @@ dependencies = [ "rand_core 0.6.4", ] -[[package]] -name = "rand_core" -version = "0.3.1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "7a6fdeb83b075e8266dcc8762c22776f6877a63111121f5f8c7411e5be7eed4b" -dependencies = [ - "rand_core 0.4.2", -] - -[[package]] -name = "rand_core" -version = "0.4.2" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "9c33a3c44ca05fa6f1807d8e6743f3824e8509beca625669633be0acbdf509dc" - [[package]] name = "rand_core" version = "0.5.1" @@ -4498,21 +4256,6 @@ dependencies = [ "num_cpus", ] -[[package]] -name = "rdrand" -version = "0.4.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "678054eb77286b51581ba43620cc911abf02758c91f93f479767aed0f90458b2" -dependencies = [ - "rand_core 0.3.1", -] - -[[package]] -name = "redox_syscall" -version = "0.1.57" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "41cc0f7e4d5d4544e8861606a285bb08d3e70712ccc7d2b84d7c0ccfaf4b05ce" - [[package]] name = "redox_syscall" version = "0.2.16" @@ -4662,12 +4405,10 @@ dependencies = [ "serde_urlencoded", "tokio", "tokio-rustls", - "tokio-util", "tower-service", "url", "wasm-bindgen", "wasm-bindgen-futures", - "wasm-streams", "web-sys", "webpki-roots 0.22.6", "winreg", @@ -4753,7 +4494,7 @@ version = "0.1.0" dependencies = [ "async-trait", "db", - "futures 0.3.25", + "futures", "http 0.1.0", "hyper 0.14.27", "log", @@ -5082,7 +4823,7 @@ dependencies = [ "phf_codegen 0.8.0", "precomputed-hash", "servo_arc", - "smallvec 1.10.0", + "smallvec", "thin-slice", ] @@ -5132,7 +4873,7 @@ dependencies = [ "serde", "serde_json", "static_init", - "time 0.3.28", + "time", "ts-rs", ] @@ -5244,7 +4985,7 @@ dependencies = [ "serde", "serde_json", "serde_with_macros 3.0.0", - "time 0.3.28", + "time", ] [[package]] @@ -5433,21 +5174,6 @@ dependencies = [ "unidecode", ] -[[package]] -name = "smallvec" -version = "0.3.4" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "e143aeee11cc8ece23c8336394de5138e598b84f5720fb8e895e2c6096322d88" - -[[package]] -name = "smallvec" -version = "0.6.14" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "b97fcaeba89edba30f044a10c6a3cc39df9c3f17d7cd829dd1446cab35f890e0" -dependencies = [ - "maybe-uninit", -] - [[package]] name = "smallvec" version = "1.10.0" @@ -5599,7 +5325,7 @@ dependencies = [ "db", "defer", "drop-tracer", - "futures 0.3.25", + "futures", "http 0.1.0", "hyper 0.14.27", "ip-counter", @@ -5807,7 +5533,7 @@ dependencies = [ "lexical", "num-bigint", "serde", - "smallvec 1.10.0", + "smallvec", "swc_atoms", "swc_common", "swc_ecma_ast", @@ -6000,17 +5726,6 @@ dependencies = [ "syn 2.0.48", ] -[[package]] -name = "thread-id" -version = "3.3.0" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "c7fbf4c9d56b320106cd64fd024dadfa0be7cb4706725fc44a7d7ce952d820c1" -dependencies = [ - "libc 0.2.147", - "redox_syscall 0.1.57", - "winapi", -] - [[package]] name = "thread_local" version = "1.1.7" @@ -6032,17 +5747,6 @@ dependencies = [ "weezl", ] -[[package]] -name = "time" -version = "0.1.45" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1b797afad3f312d1c66a56d11d0316f916356d11bd158fbc6ca6389ff6bf805a" -dependencies = [ - "libc 0.2.147", - "wasi 0.10.0+wasi-snapshot-preview1", - "winapi", -] - [[package]] name = "time" version = "0.3.28" @@ -6361,7 +6065,7 @@ dependencies = [ "lazy_static", "log", "rand 0.8.5", - "smallvec 1.10.0", + "smallvec", "thiserror", "tinyvec", "tokio", @@ -6382,7 +6086,7 @@ dependencies = [ "lru-cache", "parking_lot 0.12.1", "resolv-conf", - "smallvec 1.10.0", + "smallvec", "thiserror", "tokio", "trust-dns-proto", @@ -6761,12 +6465,6 @@ version = "0.9.0+wasi-snapshot-preview1" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "cccddf32554fecc6acb585f82a32a72e28b48f8c4c1883ddfeeeaa96f7d8e519" -[[package]] -name = "wasi" -version = "0.10.0+wasi-snapshot-preview1" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "1a143597ca7c7793eff794def352d41792a93c481eb1042423ff7ff72ba2c31f" - [[package]] name = "wasi" version = "0.11.0+wasi-snapshot-preview1" @@ -6839,19 +6537,6 @@ version = "0.2.83" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "1c38c045535d93ec4f0b4defec448e4291638ee608530863b1e2ba115d4fff7f" -[[package]] -name = "wasm-streams" -version = "0.2.3" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "6bbae3363c08332cadccd13b67db371814cd214c2524020932f0804b8cf7c078" -dependencies = [ - "futures-util", - "js-sys", - "wasm-bindgen", - "wasm-bindgen-futures", - "web-sys", -] - [[package]] name = "web-sys" version = "0.3.60" diff --git a/Cargo.toml b/Cargo.toml index eb3a4f1e..754270b4 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -20,16 +20,6 @@ resolver = "2" members = [ "rs/bin/openstream", - "rs/internal-scripts/migrations", - "rs/internal-scripts/create-test-accounts", - "rs/internal-scripts/producer", - "rs/internal-scripts/client", - "rs/internal-scripts/mixed", - "rs/internal-scripts/ffstress", - "rs/internal-scripts/hls-logger", - "rs/internal-scripts/create-random-stream-connections", - "rs/internal-scripts/purge-deleted-items", - "rs/config/constants", "rs/packages/openapi", diff --git a/audio/audio-5s.mp3 b/audio/audio-5s.mp3 deleted file mode 100644 index f013a849..00000000 Binary files a/audio/audio-5s.mp3 and /dev/null differ diff --git a/audio/audio-metadata-15s.mp3 b/audio/audio-metadata-15s.mp3 deleted file mode 100644 index a28b3550..00000000 Binary files a/audio/audio-metadata-15s.mp3 and /dev/null differ diff --git a/audio/audio.aac b/audio/audio.aac deleted file mode 100644 index 9f8ebd47..00000000 Binary files a/audio/audio.aac and /dev/null differ diff --git a/audio/audio.mp3 b/audio/audio.mp3 deleted file mode 100644 index 363319c1..00000000 Binary files a/audio/audio.mp3 and /dev/null differ diff --git a/audio/partial-server.mjs b/audio/partial-server.mjs deleted file mode 100644 index 8b2a7228..00000000 --- a/audio/partial-server.mjs +++ /dev/null @@ -1,18 +0,0 @@ -import http from "http"; -import fs from "fs"; -import { fileURLToPath } from "url"; -import path from "path"; - -const __dirname = path.dirname(fileURLToPath(import.meta.url)); - -const file = fs.readFileSync(`${__dirname}/audio-5s.mp3`); - -http.createServer((req, res) => { - res.writeHead(200, { - "content-type": "audio/mpeg", - }); - - res.write(file); -}).listen(3000, () => { - console.log("server listening on port 3000"); -}) \ No newline at end of file diff --git a/build.sh b/build.sh deleted file mode 100755 index eaa4affd..00000000 --- a/build.sh +++ /dev/null @@ -1,3 +0,0 @@ -#!/bin/bash -cargo build --release --bin openstream && -cd front && npm run ci && npm run build && cd .. \ No newline at end of file diff --git a/docs.sh b/docs.sh old mode 100755 new mode 100644 diff --git a/openstream-1.config.cjs b/openstream-1.config.cjs deleted file mode 100644 index 0244850a..00000000 --- a/openstream-1.config.cjs +++ /dev/null @@ -1,31 +0,0 @@ -module.exports = { - apps: [{ - namespace: "s1", - name: "openstream-1", - exec_mode: "fork", - time: false, - merge_logs: true, - kill_timeout: 60_000, - script: "./target/release/openstream", - args: ["start", "-c", "./openstream.toml"], - instances: 1, - env: { - RUST_LOG_STYLE: "always", - FORCE_COLOR: 1, - } - }, { - namespace: "s1", - script: "./openstream-front.mjs", - interpreter_args: ["--no-warnings", "--experimental-specifier-resolution=node"], - args: ["start", "-c", "./openstream-front.toml"], - time: false, - merge_logs: true, - name: "openstream-front-1", - env: { - FORCE_COLOR: "1", - LOG_TS: "1", - }, - instances: 4, - mode: "cluster", - }] -} \ No newline at end of file diff --git a/openstream-2.config.cjs b/openstream-2.config.cjs deleted file mode 100644 index 4829da87..00000000 --- a/openstream-2.config.cjs +++ /dev/null @@ -1,31 +0,0 @@ -module.exports = { - apps: [{ - namespace: "s2", - name: "openstream-2", - exec_mode: "fork", - time: false, - merge_logs: true, - kill_timeout: 60_000, - script: "./target/release/openstream", - args: ["start", "-c", "./openstream-2.toml"], - instances: 1, - env: { - RUST_LOG_STYLE: "always", - FORCE_COLOR: "1", - } - }, { - namespace: "s2", - script: "./openstream-front.mjs", - interpreter_args: ["--no-warnings", "--experimental-specifier-resolution=node"], - args: ["start", "-c", "./openstream-front-2.toml"], - time: false, - merge_logs: true, - name: "openstream-front-2", - env: { - FORCE_COLOR: "1", - LOG_TS: "1", - }, - instances: 4, - mode: "cluster", - }] -} \ No newline at end of file diff --git a/rs/internal-scripts/README.md b/rs/internal-scripts/README.md deleted file mode 100644 index 3675e7bb..00000000 --- a/rs/internal-scripts/README.md +++ /dev/null @@ -1,3 +0,0 @@ -# Internal scripts - -This folder contains internal scripts written in rust for some internal benchmarking and testing, do not use \ No newline at end of file diff --git a/rs/internal-scripts/client/Cargo.toml b/rs/internal-scripts/client/Cargo.toml deleted file mode 100644 index 0acc0ffd..00000000 --- a/rs/internal-scripts/client/Cargo.toml +++ /dev/null @@ -1,21 +0,0 @@ -[package] -name = "client" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -bytes = "1.2.1" -dotenv = "0.15.0" -ffmpeg = { version = "0.1.0", path = "../../packages/ffmpeg" } -futures = "0.3.24" -hyper = { version = "0.14.27", features = ["full"] } -jemallocator = "0.5.0" -lazy_static = "1.4.0" -rand = "0.8.5" -reqwest = { version = "0.11", default-features = false, features = [ "rustls-tls" ] } -static_init = "1.0.3" -tokio = { version = "1.29.0", features = ["full"] } - -[features] diff --git a/rs/internal-scripts/client/src/main.rs b/rs/internal-scripts/client/src/main.rs deleted file mode 100644 index cae59f90..00000000 --- a/rs/internal-scripts/client/src/main.rs +++ /dev/null @@ -1,277 +0,0 @@ -use bytes::Bytes; -use ffmpeg::{Ffmpeg, FfmpegConfig, FfmpegSpawn}; -use hyper::Body; -use reqwest::Client; -use std::sync::atomic::{AtomicUsize, Ordering}; -use std::time::{Duration, Instant}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; - -use jemallocator::Jemalloc; - -#[global_allocator] -static ALLOCATOR: Jemalloc = Jemalloc; - -static CONNECTING_CLIENTS: AtomicUsize = AtomicUsize::new(0); -static CURRENT_CLIENTS: AtomicUsize = AtomicUsize::new(0); -static HISTORIC_CLIENTS: AtomicUsize = AtomicUsize::new(0); -static BYTES_READED: AtomicUsize = AtomicUsize::new(0); -static ERRORS: AtomicUsize = AtomicUsize::new(0); - -static BODY: &[u8] = include_bytes!("../../../../audio/audio.mp3"); - -const DEFAULT_C: usize = 20_000; - -const DEFAULT_S: u64 = 10_000; - -lazy_static::lazy_static! { - static ref S: u64 = { - match std::env::var("S") { - Err(_) => DEFAULT_S, - Ok(s) => s.parse::().unwrap_or(DEFAULT_S), - } - }; -} - -// fn rr_test_station_n() -> u64 { -// static RR_STATION_N: AtomicU64 = AtomicU64::new(0); -// let v = RR_STATION_N.fetch_add(1, Ordering::SeqCst); -// 1 + (v % *S) -// } - -// fn rr_test_station_id() -> String { -// format!("test{}", rr_test_station_n()) -// } - -#[tokio::main] -async fn main() { - let _ = dotenv::dotenv(); - - // let source_base = std::env::var("SOURCE_BASE_URL") - // .expect("SOURCE_BASE_URL env not set") - // .trim_end_matches('/') - // .to_string(); - - let stream_base_url = std::env::var("STREAM_BASE_URL") - .unwrap_or_else(|_| String::from("http://127.0.0.1")) - .trim_end_matches('/') - .to_string(); - - let ports: Vec = std::env::var("STREAM_PORTS") - .unwrap_or_else(|_| String::from("10200,10201,10202,10203,10204,10205,10206,10207")) - .split(',') - .map(|s| s.trim().parse().expect("invalid STREAM_PORTS env")) - .collect(); - - // mitre - let station_id = std::env::var("STATION_ID").unwrap_or_else(|_| String::from("qe6fm5ev")); - - let delay: u64 = match std::env::var("D") { - Ok(s) => s.parse().unwrap_or(5), - Err(_) => 5, - }; - - // let mountpoint_id = std::env::var("S").unwrap_or_else(|_| String::from("jr8n73bs")); - // let _: Uri = source_base.parse().expect("SOURCE_BASE_URL invalid URL"); - - let c: usize = match std::env::var("C") { - Ok(s) => s.parse().unwrap_or(DEFAULT_C), - Err(_) => DEFAULT_C, - }; - - // println!("mounpoint id: {mountpoint_id}"); - // println!("source base: {source_base}"); - println!("stream base url: {stream_base_url}"); - println!("stream ports: {ports:?}"); - println!("station id: {station_id}"); - println!("concurrency: {c}"); - println!("delay: {delay}"); - - let _ = tokio::try_join!( - // tokio::spawn(producer(source_base, mountpoint_id.clone())), - tokio::spawn(clients(c, stream_base_url, ports, station_id, delay)), - tokio::spawn(print_stats()) - ) - .unwrap(); -} - -#[allow(unused)] -async fn producer(base: String, id: String) { - let client = Client::new(); - let (mut tx, body) = Body::channel(); - - let config = FfmpegConfig { - readrate: true, - ..Default::default() - }; - - let FfmpegSpawn { - child: _child, - mut stdin, - mut stdout, - .. - } = Ffmpeg::new(config).spawn().expect("ffmpeg spawn"); - - tokio::spawn(async move { - loop { - stdin.write_all(BODY).await.expect("ffmpeg write"); - } - }); - - let _sender = tokio::spawn(async move { - let mut buf = [0u8; 8000]; - loop { - let n = stdout.read(&mut buf).await.expect("ffmpeg read"); - tx.send_data(Bytes::copy_from_slice(&buf[0..n])) - .await - .expect("producer send_data"); - } - }); - - let response = client - .put(format!("{base}/{id}/source")) - .body(body) - .send() - .await - .expect("producer send request"); - - println!("producer status: {:?}", response.status()); - - let body = response.text().await.expect("producer text().await"); - - println!("producer body: {body}"); - panic!("producer terminated"); -} - -async fn clients( - n: usize, - stream_base_url: String, - ports: Vec, - station_id: String, - delay: u64, -) { - tokio::time::sleep(Duration::from_millis(1_000)).await; - - let http_client = Client::builder() - .http1_only() - .build() - .expect("build client"); - - tokio::spawn(async move { - for i in 0..n { - tokio::time::sleep(Duration::from_millis(delay)).await; - let port = ports[i % ports.len()]; - let base_url = stream_base_url.clone(); - let http_client = http_client.clone(); - let station_id = station_id.clone(); - tokio::spawn(async move { - loop { - let r = client(&http_client, base_url.as_str(), port, &station_id).await; - if let Err(e) = r { - ERRORS.fetch_add(1, Ordering::Relaxed); - println!("err: {}", e); - } - CURRENT_CLIENTS.fetch_sub(1, Ordering::Relaxed); - } - }); - } - }) - .await - .unwrap(); -} - -/* -async fn client(base: &str, id: &str) -> Result<(), std::io::Error> { - let url: Uri = base.parse().unwrap(); - let addr = SocketAddr::from(([0, 0, 0, 0], url.port_u16().unwrap_or(80))); - - CONNECTING_CLIENTS.fetch_add(1, Ordering::Relaxed); - - let mut socket = TcpStream::connect(addr).await?; - - CONNECTING_CLIENTS.fetch_sub(1, Ordering::Relaxed); - CURRENT_CLIENTS.fetch_add(1, Ordering::Relaxed); - HISTORIC_CLIENTS.fetch_add(1, Ordering::Relaxed); - - socket - .write_all(format!("GET /broadcast/{id} HTTP/1.0\r\n").as_bytes()) - .await?; - socket - .write_all(format!("host: localhost\r\n\r\n").as_bytes()) - .await?; - - let mut buf = [0; 8000]; - loop { - let n = socket.read(&mut buf).await?; - if n == 0 { - break; - } - BYTES_READED.fetch_add(n, Ordering::Relaxed); - } - - Ok(()) -} -*/ -async fn client( - client: &Client, - base_url: &str, - port: u16, - id: &str, -) -> Result<(), reqwest::Error> { - CURRENT_CLIENTS.fetch_add(1, Ordering::Relaxed); - HISTORIC_CLIENTS.fetch_add(1, Ordering::Relaxed); - - //let client = Client::new(); - let mut res = client - .get(format!("{base_url}:{port}/stream/{id}")) - .send() - .await?; - - if !res.status().is_success() { - let status = res.status(); - let text = res.text().await?; - println!( - "req err with status {} {:?} => {}", - status.as_u16(), - status.canonical_reason(), - text, - ); - - return Ok(()); - } - - let start = Instant::now(); - - while let Some(data) = res.chunk().await? { - BYTES_READED.fetch_add(data.len(), Ordering::Relaxed); - if start.elapsed() > Duration::from_secs(60_000) { - break; - } - } - - Ok(()) -} - -async fn print_stats() { - let mut prev = 0; - let mut interval = tokio::time::interval(Duration::from_secs(1)); - interval.tick().await; - loop { - interval.tick().await; - let bytes_readed = BYTES_READED.load(Ordering::Relaxed); - let speed = bytes_readed - prev; - prev = bytes_readed; - - let historic_clients = HISTORIC_CLIENTS.load(Ordering::Relaxed); - let current_clients = CURRENT_CLIENTS.load(Ordering::Relaxed); - let connecting_clients = CONNECTING_CLIENTS.load(Ordering::Relaxed); - let errors = ERRORS.load(Ordering::Relaxed); - - println!("=========================================="); - println!("{connecting_clients} connecting clients"); - println!("{current_clients} open connections"); - println!("{historic_clients} all time connections"); - println!("{errors} errors"); - println!("{} MB", bytes_readed as f64 / 1024_f64 / 1024_f64); - println!("{} MB/sec", speed / 1024 / 1024); - } -} diff --git a/rs/internal-scripts/create-random-stream-connections/Cargo.toml b/rs/internal-scripts/create-random-stream-connections/Cargo.toml deleted file mode 100644 index f82e615e..00000000 --- a/rs/internal-scripts/create-random-stream-connections/Cargo.toml +++ /dev/null @@ -1,24 +0,0 @@ -[package] -name = "create-random-stream-connections" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -anyhow = "1.0.68" -config = { version = "0.1.0", path = "../../packages/config" } -db = { version = "0.1.0", path = "../../packages/db" } -futures = "0.3.25" -futures-util = "0.3.25" -geoip = { version = "0.1.0", path = "../../packages/geoip" } -log = "0.4.17" -logger = { version = "0.1.0", path = "../../packages/logger" } -mongodb = "2.7.0" -owo-colors = { version = "3.5.0", path = "../../packages/owo-colors" } -rand = "0.8.5" -serde-util = { version = "0.1.0", path = "../../packages/serde-util" } -thiserror = "1.0.38" -time = "0.3.20" -tokio = { version = "1.29.0", features = ["full"] } -user-agent = { version = "0.1.0", path = "../../packages/user-agent" } diff --git a/rs/internal-scripts/create-random-stream-connections/src/main.rs b/rs/internal-scripts/create-random-stream-connections/src/main.rs deleted file mode 100644 index 8df2ee1a..00000000 --- a/rs/internal-scripts/create-random-stream-connections/src/main.rs +++ /dev/null @@ -1,194 +0,0 @@ -use anyhow::Context; -use db::{ - http::SocketAddr, - station::Station, - stream_connection::{lite::StreamConnectionLite, StreamConnection}, - Model, -}; -use log::*; -use time::Duration; - -fn random_os() -> Option { - use rand::seq::SliceRandom; // 0.7.2 - - #[allow(clippy::useless_vec)] - let vs = vec![ - Some("Linux"), - Some("Android"), - Some("iPhone"), - Some("iPad"), - Some("Windows"), - Some("Mac OSX"), - None, - ]; - let option = vs.choose(&mut rand::thread_rng()).unwrap(); - option.map(String::from) -} - -fn random_browser() -> Option { - use rand::seq::SliceRandom; // 0.7.2 - - #[allow(clippy::useless_vec)] - let vs = vec![ - Some("Chrome"), - Some("Safari"), - Some("Edge"), - Some("Firefox"), - None, - ]; - let option = vs.choose(&mut rand::thread_rng()).unwrap(); - option.map(String::from) -} - -#[tokio::main] -async fn main() -> Result<(), anyhow::Error> { - let c: usize = std::env::var("C") - .context("env C is required")? - .parse() - .context("env C must be a usize")?; - let station_id = std::env::var("S").context("env S is required")?; - create_random_stream_connections(c, station_id).await -} - -async fn create_random_stream_connections( - c: usize, - station_id: String, -) -> Result<(), anyhow::Error> { - use owo_colors::*; - logger::init(); - //let _ = dotenv::dotenv(); - - // let canonical_config_path = std::fs::canonicalize(config.as_str()) - // .with_context(|| format!("error loading config file from {}", config.yellow()))?; - - let config = "./openstream.toml"; - - info!("loading config file from {}", config.yellow()); - - let config = config::load(Some(config)) - .with_context(|| format!("error loading config file from {}", config.yellow(),))?; - - debug!("config loaded: resolved config: {:#?}", config); - - let client = mongodb::Client::with_uri_str(config.mongodb.url.as_str()) - .await - .context("failed to create mongodb client")?; - - if client.default_database().is_none() { - anyhow::bail!("no database specified in config, under [mongodb] url"); - } - - db::init(client, config.mongodb.storage_db_name); - - info!("ensuring mongodb collections..."); - db::ensure_collections() - .await - .context("error ensuring mongodb collections and indexes")?; - - let _ = match Station::get_by_id(&station_id).await? { - None => anyhow::bail!("cannot find station with id {station_id}"), - Some(station) => station, - }; - - for i in 0..c { - create_random_stream_connection(c, &station_id, i).await? - } - - info!("created {c} stream connections"); - - Ok(()) -} - -async fn create_random_stream_connection( - c: usize, - station_id: &str, - i: usize, -) -> Result<(), anyhow::Error> { - if i % 10_000 == 0 { - info!("creating stream connection {i} of {c}"); - } - - let ip = std::net::IpAddr::from([ - rand::random(), - rand::random(), - rand::random(), - rand::random(), - ]); - - let request = db::http::Request { - country_code: geoip::ip_to_country_code(&ip), - real_ip: ip, - local_addr: SocketAddr { - ip: std::net::IpAddr::from([0, 0, 0, 0]), - port: 1, - }, - remote_addr: SocketAddr { - ip: std::net::IpAddr::from([0, 0, 0, 0]), - port: 1, - }, - uri: db::http::Uri { - uri: "".into(), - scheme: None, - host: None, - port: None, - path: "".into(), - query: None, - }, - user_agent: user_agent::UserAgent { - ua: None, - category: None, - browser_type: None, - vendor: None, - name: random_browser(), - version: None, - os: random_os(), - os_version: None, - }, - version: db::http::Version::HTTP_10, - method: db::http::Method::GET, - headers: db::http::Headers::new(), - }; - - let created_at: time::OffsetDateTime = - time::OffsetDateTime::now_utc() - (time::Duration::DAY * 30 * rand::random::()); - - let is_open = rand::random::() < (1_f64 / 30_f64); - - let duration_ms: Option; - let closed_at: Option; - - if is_open { - let mul: f64 = rand::random(); - let ms = (mul * 6000.0) as u64; // 10 min - duration_ms = Some(ms); - closed_at = Some((created_at + Duration::MILLISECOND * (ms as f64)).into()); - } else { - duration_ms = None; - closed_at = None; - }; - - let transfer_bytes = duration_ms.map(|s| s * 16); // 16 kbps - - let document = StreamConnection { - id: StreamConnection::uid(), - station_id: station_id.to_string(), - deployment_id: String::from(""), - is_open, - ip: request.real_ip, - country_code: request.country_code, - transfer_bytes, - duration_ms, - is_external_relay_redirect: false, - created_at: created_at.into(), - last_transfer_at: created_at.into(), - closed_at, - request, - }; - - let document_lite = StreamConnectionLite::from_stream_connection_ref(&document); - - StreamConnection::insert(document).await?; - StreamConnectionLite::insert(document_lite).await?; - - Ok(()) -} diff --git a/rs/internal-scripts/create-test-accounts/Cargo.toml b/rs/internal-scripts/create-test-accounts/Cargo.toml deleted file mode 100644 index f1d45c0d..00000000 --- a/rs/internal-scripts/create-test-accounts/Cargo.toml +++ /dev/null @@ -1,20 +0,0 @@ -[package] -name = "create-test-accounts" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -anyhow = "1.0.68" -config = { version = "0.1.0", path = "../../packages/config" } -db = { version = "0.1.0", path = "../../packages/db" } -futures = "0.3.25" -futures-util = "0.3.25" -log = "0.4.17" -logger = { version = "0.1.0", path = "../../packages/logger" } -mongodb = "2.7.0" -owo-colors = { version = "3.5.0", path = "../../packages/owo-colors" } -serde-util = { version = "0.1.0", path = "../../packages/serde-util" } -thiserror = "1.0.38" -tokio = { version = "1.29.0", features = ["full"] } diff --git a/rs/internal-scripts/create-test-accounts/src/main.rs b/rs/internal-scripts/create-test-accounts/src/main.rs deleted file mode 100644 index 88d9b72a..00000000 --- a/rs/internal-scripts/create-test-accounts/src/main.rs +++ /dev/null @@ -1,150 +0,0 @@ -use anyhow::Context; -use db::{ - audio_chunk::AudioChunk, audio_file::AudioFile, run_transaction, station::Station, Model, -}; -use futures::{StreamExt, TryStreamExt}; -use log::*; -use mongodb::bson::doc; - -const BASE_STATION_ID: &str = "zrmgqj2f"; -const C: usize = 10_000; - -#[tokio::main] -async fn main() -> Result<(), anyhow::Error> { - craete_test_stations().await -} - -async fn craete_test_stations() -> Result<(), anyhow::Error> { - use owo_colors::*; - logger::init(); - //let _ = dotenv::dotenv(); - - // let canonical_config_path = std::fs::canonicalize(config.as_str()) - // .with_context(|| format!("error loading config file from {}", config.yellow()))?; - - let config = "./openstream.toml"; - - info!("loading config file from {}", config.yellow()); - - let config = config::load(Some(config)) - .with_context(|| format!("error loading config file from {}", config.yellow(),))?; - - debug!("config loaded: resolved config: {:#?}", config); - - let client = mongodb::Client::with_uri_str(config.mongodb.url.as_str()) - .await - .context("failed to create mongodb client")?; - - if client.default_database().is_none() { - anyhow::bail!("no database specified in config, under [mongodb] url"); - } - - db::init(client, Some("openstream_storage".into())); - - info!("ensuring mongodb collections..."); - db::ensure_collections() - .await - .context("error ensuring mongodb collections and indexes")?; - - let station = match Station::get_by_id(BASE_STATION_ID).await? { - None => anyhow::bail!("cannot find station with id {BASE_STATION_ID}"), - Some(station) => station, - }; - - for i in 1..=C { - create_test_station(i, station.clone()).await? - } - - println!("Done!"); - - Ok(()) -} - -async fn create_test_station(i: usize, base: Station) -> Result<(), anyhow::Error> { - info!("creating test station {i} of {C}"); - let station_id = format!("test{i}"); - let now = serde_util::DateTime::now(); - let station = Station { - id: station_id.clone(), - account_id: base.account_id, - - name: format!("Test Station {i}"), - - playlist_is_randomly_shuffled: false, - source_password: Station::random_source_password(), - - system_metadata: base.system_metadata.clone(), - user_metadata: base.user_metadata.clone(), - - created_at: now, - updated_at: now, - deleted_at: None, - - ..base - }; - - run_transaction!(session => { - tx_try!(Station::insert_with_session(&station, &mut session).await); - }); - - let filter = doc! { AudioFile::KEY_STATION_ID: &base.id }; - - let files: Vec = AudioFile::cl() - .find(filter, None) - .await? - .take(1) - .try_collect() - .await?; - - for base in files { - info!("{} - duplicating file {}", station_id, base.filename); - let order = AudioFile::next_max_order(&station_id, None).await?; - let file_id = AudioFile::uid(); - let file = AudioFile { - id: file_id.clone(), - station_id: station_id.clone(), - bytes_sec: base.bytes_sec, - chunk_count: base.chunk_count, - chunk_duration_ms: base.chunk_duration_ms, - chunk_len: base.chunk_len, - duration_ms: base.duration_ms, - filename: base.filename.clone(), - len: base.len, - sha256: base.sha256.clone(), - metadata: base.metadata.clone(), - order, - created_at: now, - }; - - AudioFile::insert(file).await?; - - let filter = doc! { AudioChunk::KEY_AUDIO_FILE_ID: &base.id }; - let chunks: Vec = AudioChunk::cl() - .find(filter, None) - .await? - .try_collect() - .await?; - - info!("{} - duplicating {} chunks", station_id, chunks.len()); - - for base in chunks { - let chunk = AudioChunk { - id: AudioChunk::uid(), - audio_file_id: file_id.clone(), - station_id: station_id.clone(), - created_at: now, - bytes_sec: base.bytes_sec, - data: base.data, - duration_ms: base.duration_ms, - end_ms: base.end_ms, - i: base.i, - len: base.len, - start_ms: base.start_ms, - }; - - AudioChunk::insert(chunk).await?; - } - } - - Ok(()) -} diff --git a/rs/internal-scripts/ffstress/Cargo.toml b/rs/internal-scripts/ffstress/Cargo.toml deleted file mode 100644 index 026c6cd5..00000000 --- a/rs/internal-scripts/ffstress/Cargo.toml +++ /dev/null @@ -1,10 +0,0 @@ -[package] -name = "ffstress" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -ffmpeg = { version = "0.1.0", path = "../../packages/ffmpeg" } -tokio = { version = "1.29.0", features = ["full"] } diff --git a/rs/internal-scripts/ffstress/src/main.rs b/rs/internal-scripts/ffstress/src/main.rs deleted file mode 100644 index 602a3980..00000000 --- a/rs/internal-scripts/ffstress/src/main.rs +++ /dev/null @@ -1,107 +0,0 @@ -use std::{ - sync::atomic::{AtomicUsize, Ordering}, - time::Duration, -}; - -use ffmpeg::{Ffmpeg, FfmpegConfig, FfmpegSpawn}; -use tokio::io::{AsyncReadExt, AsyncWriteExt}; - -static AUDIO: &[u8] = include_bytes!("../../../../audio/audio.mp3"); - -static CURRENT: AtomicUsize = AtomicUsize::new(0); -static IN: AtomicUsize = AtomicUsize::new(0); -static OUT: AtomicUsize = AtomicUsize::new(0); - -#[tokio::main] -async fn main() { - let c: usize = std::env::var("C") - .expect("C env missing") - .parse() - .expect("C env invalid"); - - let mut handles = vec![]; - - for i in 0..c { - handles.push(tokio::spawn(spawn(i))); - } - - let cmds = async move { - for h in handles { - h.await.expect("tokio::spawn"); - } - }; - - tokio::join!(logger(), cmds); -} - -async fn spawn(_i: usize) { - tokio::spawn(async move { - CURRENT.fetch_add(1, Ordering::SeqCst); - - let config = FfmpegConfig { - readrate: true, - kbitrate: 320_000, - ..Default::default() - }; - - let FfmpegSpawn { - mut stdin, - mut stdout, - child: _child, - .. - } = Ffmpeg::new(config).spawn().expect("ffmpeg spawn"); - - let write = async move { - loop { - for chunk in AUDIO.chunks(256) { - stdin.write_all(chunk).await.expect("ffmpeg write"); - IN.fetch_add(chunk.len(), Ordering::SeqCst); - } - } - }; - - let read = async move { - let mut buf = [0; 1000]; - - loop { - let n = stdout.read(&mut buf).await.expect("ffmpeg read"); - - if n == 0 { - panic!("ffmpeg end"); - } - - OUT.fetch_add(n, Ordering::SeqCst); - } - }; - - tokio::join!(write, read); - }) - .await - .expect("ffmpeg tokio::spawn panic"); - - CURRENT.fetch_sub(1, Ordering::SeqCst); -} - -async fn logger() { - let mut interval = tokio::time::interval(Duration::from_secs(1)); - interval.tick().await; - let mut curr_in = IN.load(Ordering::SeqCst); - let mut curr_out = OUT.load(Ordering::SeqCst); - loop { - interval.tick().await; - let prev_in = curr_in; - let prev_out = curr_out; - curr_in = IN.load(Ordering::SeqCst); - curr_out = OUT.load(Ordering::SeqCst); - - let cmds = CURRENT.load(Ordering::SeqCst); - - let kbps_in = (curr_in - prev_in) * 8 / 1000 / cmds; - let kbps_out = (curr_out - prev_out) * 8 / 1000 / cmds; - - println!("=============================="); - println!("{cmds} stations"); - println!(" IN: {kbps_in:5} kbps/station"); - println!("OUT: {kbps_out:5} kbps/station"); - } -} diff --git a/rs/internal-scripts/hls-logger/Cargo.toml b/rs/internal-scripts/hls-logger/Cargo.toml deleted file mode 100644 index d2a271d8..00000000 --- a/rs/internal-scripts/hls-logger/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "hls-logger" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -hyper = { version = "0.14.27", features = ["full"] } -log = "0.4.17" -logger = { version = "0.1.0", path = "../../packages/logger" } -prex = { version = "0.1.0", path = "../../packages/prex" } -tokio = { version = "1.29.0", features = ["full"] } diff --git a/rs/internal-scripts/hls-logger/src/main.rs b/rs/internal-scripts/hls-logger/src/main.rs deleted file mode 100644 index e4bd9ecb..00000000 --- a/rs/internal-scripts/hls-logger/src/main.rs +++ /dev/null @@ -1,42 +0,0 @@ -use std::net::SocketAddr; - -use hyper::{Server, StatusCode}; -use log::*; -use logger::init; -use prex::{Next, Request, Response}; - -#[tokio::main] -async fn main() { - init(); - - let mut app = prex::prex(); - - app.with(logger); - app.with(ok); - - let app = app.build().expect("prex build"); - - let addr = SocketAddr::from(([0, 0, 0, 0], 21000)); - - let server = Server::bind(&addr); - info!("server bound to {addr}"); - - server.serve(app).await.expect("hyper serve"); -} - -async fn logger(req: Request, next: Next) -> Response { - let method = req.method().to_string(); - let uri = req - .uri() - .path_and_query() - .map(ToString::to_string) - .unwrap_or_default(); - - info!("{method} {uri}"); - - next.run(req).await -} - -async fn ok(_: Request, _: Next) -> Response { - Response::new(StatusCode::OK) -} diff --git a/rs/internal-scripts/migrations/Cargo.toml b/rs/internal-scripts/migrations/Cargo.toml deleted file mode 100644 index 0e0c2996..00000000 --- a/rs/internal-scripts/migrations/Cargo.toml +++ /dev/null @@ -1,34 +0,0 @@ -[package] -name = "migrations" -version = "0.1.0" -edition = "2021" - -[[bin]] -name = "migration-add-audio-file-order" -path = "./src/migration_add_audio_file_order.rs" - -[[bin]] -name = "migration-add-access-token-media-key" -path = "./src/migration_add_access_token_media_key.rs" - -[[bin]] -name = "migration-update-stream-connections-lite" -path = "./src/migration_update_stream_connections_lite.rs" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -anyhow = "1.0.68" -config = { version = "0.1.0", path = "../../packages/config" } -crypt = { version = "0.1.0", path = "../../packages/crypt" } -db = { version = "0.1.0", path = "../../packages/db" } -dotenv = "0.15.0" -futures-util = "0.3.25" -log = "0.4.17" -logger = { version = "0.1.0", path = "../../packages/logger" } -macros = { version = "0.1.0", path = "../../packages/macros" } -mongodb = "2.7.0" -owo-colors = { version = "3.5.0", path = "../../packages/owo-colors" } -serde = { version = "1.0.152", features = ["derive"] } -serde-util = { version = "0.1.0", path = "../../packages/serde-util" } -tokio = { version = "1.29.0", features = ["full"] } diff --git a/rs/internal-scripts/migrations/src/migration_add_access_token_media_key.rs b/rs/internal-scripts/migrations/src/migration_add_access_token_media_key.rs deleted file mode 100644 index b06380e2..00000000 --- a/rs/internal-scripts/migrations/src/migration_add_access_token_media_key.rs +++ /dev/null @@ -1,133 +0,0 @@ -#![allow(unreachable_code)] - -use anyhow::Context; -use config::Config; -use db::access_token::{GeneratedBy, Scope}; -use log::*; -use serde::{Deserialize, Serialize}; -use serde_util::DateTime; - -#[tokio::main] -async fn main() -> Result<(), anyhow::Error> { - anyhow::bail!("already migrated"); - shared_init(String::from("./openstream.toml")).await?; - migrate().await?; - Ok(()) -} - -async fn shared_init(config: String) -> Result { - use owo_colors::*; - - logger::init(); - let _ = dotenv::dotenv(); - - let canonical_config_path = std::fs::canonicalize(config.as_str()) - .with_context(|| format!("error loading config file from {}", config.yellow()))?; - - info!( - "loading config file from {}", - canonical_config_path.to_string_lossy().yellow() - ); - - let config = config::load(Some(config)).with_context(|| { - format!( - "error loading config file from {}", - canonical_config_path.to_string_lossy().yellow(), - ) - })?; - - debug!("config loaded: resolved config: {:#?}", config); - - let client_options = mongodb::options::ClientOptions::parse(config.mongodb.url.as_str()) - .await - .context("failed to parse mongodb connection string")?; - - info!("mongodb config hosts: {:?}", client_options.hosts); - info!( - "mongodb client compressors: {:?}", - client_options.compressors - ); - - let client = - mongodb::Client::with_options(client_options).context("failed to create mongodb client")?; - - if client.default_database().is_none() { - anyhow::bail!("no database specified in config, under [mongodb] url"); - } - - info!("mongodb client created"); - - db::init(client, config.mongodb.storage_db_name.clone()); - - // info!("ensuring mongodb collections..."); - // db::ensure_collections() - // .await - // .context("error ensuring mongodb collections and indexes")?; - - Ok(config) -} - -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -#[macros::keys] -pub struct PrevAccessToken { - #[serde(rename = "_id")] - pub id: String, - - pub key: String, - - /// the media_key is used to access streams and files with access token scope directly - /// from the client without exposing a full access token - // pub media_key: String, - - #[serde(flatten)] - pub scope: Scope, - - #[serde(flatten)] - pub generated_by: GeneratedBy, - - pub last_used_at: Option, - - #[serde(with = "serde_util::as_f64")] - pub hits: u64, - - pub created_at: DateTime, - pub deleted_at: Option, -} - -async fn migrate() -> Result<(), mongodb::error::Error> { - use db::models::access_token::AccessToken; - use db::Model; - - let mut count: usize = 0; - db::run_transaction!(session => { - let mut cursor = tx_try!(AccessToken::cl_as::().find_with_session(None, None, &mut session).await); - while let Some(document) = cursor.next(&mut session).await { - let src = tx_try!(document); - let media_key = AccessToken::random_media_key(); - let media_hash = crypt::sha256(media_key); - let target = AccessToken { - id: src.id, - hash: crypt::sha256(src.key), - media_hash, - created_at: src.created_at, - deleted_at: src.deleted_at, - last_used_at: src.last_used_at, - generated_by: src.generated_by, - hits: src.hits, - scope: src.scope, - }; - - - - let r = tx_try!(AccessToken::replace_with_session(&target.id, &target, &mut session).await); - assert_eq!(r.matched_count, 1); - count += 1; - }; - }); - - info!("{} access tokens modified", count); - info!("Bye!"); - - Ok(()) -} diff --git a/rs/internal-scripts/migrations/src/migration_add_access_token_media_key_owner.rs b/rs/internal-scripts/migrations/src/migration_add_access_token_media_key_owner.rs deleted file mode 100644 index b0c1b0c8..00000000 --- a/rs/internal-scripts/migrations/src/migration_add_access_token_media_key_owner.rs +++ /dev/null @@ -1,133 +0,0 @@ -#![allow(unreachable_code)] - -use anyhow::Context; -use config::Config; -use db::access_token::{GeneratedBy, Scope}; -use log::*; -use serde::{Deserialize, Serialize}; -use serde_util::DateTime; - -#[tokio::main] -async fn main() -> Result<(), anyhow::Error> { - anyhow::bail!("already migrated"); - shared_init(String::from("./openstream.toml")).await?; - migrate().await?; - Ok(()) -} - -async fn shared_init(config: String) -> Result { - use owo_colors::*; - - logger::init(); - let _ = dotenv::dotenv(); - - let canonical_config_path = std::fs::canonicalize(config.as_str()) - .with_context(|| format!("error loading config file from {}", config.yellow()))?; - - info!( - "loading config file from {}", - canonical_config_path.to_string_lossy().yellow() - ); - - let config = config::load(config).with_context(|| { - format!( - "error loading config file from {}", - canonical_config_path.to_string_lossy().yellow(), - ) - })?; - - debug!("config loaded: resolved config: {:#?}", config); - - let client_options = mongodb::options::ClientOptions::parse(config.mongodb.url.as_str()) - .await - .context("failed to parse mongodb connection string")?; - - info!("mongodb config hosts: {:?}", client_options.hosts); - info!( - "mongodb client compressors: {:?}", - client_options.compressors - ); - - let client = - mongodb::Client::with_options(client_options).context("failed to create mongodb client")?; - - if client.default_database().is_none() { - anyhow::bail!("no database specified in config, under [mongodb] url"); - } - - info!("mongodb client created"); - - db::init(client, config.mongodb.storage_db_name.clone()); - - // info!("ensuring mongodb collections..."); - // db::ensure_collections() - // .await - // .context("error ensuring mongodb collections and indexes")?; - - Ok(config) -} - -#[derive(Debug, Clone, Eq, PartialEq, Serialize, Deserialize)] -#[serde(rename_all = "snake_case")] -#[macros::keys] -pub struct PrevAccessToken { - #[serde(rename = "_id")] - pub id: String, - - pub key: String, - - /// the media_key is used to access streams and files with access token scope directly - /// from the client without exposing a full access token - // pub media_key: String, - - #[serde(flatten)] - pub scope: Scope, - - #[serde(flatten)] - pub generated_by: GeneratedBy, - - pub last_used_at: Option, - - #[serde(with = "serde_util::as_f64")] - pub hits: u64, - - pub created_at: DateTime, - pub deleted_at: Option, -} - -async fn migrate() -> Result<(), mongodb::error::Error> { - use db::models::access_token::AccessToken; - use db::Model; - - let mut count: usize = 0; - db::run_transaction!(session => { - let mut cursor = tx_try!(AccessToken::cl_as::().find_with_session(None, None, &mut session).await); - while let Some(document) = cursor.next(&mut session).await { - let src = tx_try!(document); - let media_key = AccessToken::random_media_key(); - let media_hash = crypt::sha256(media_key); - let target = AccessToken { - id: src.id, - hash: crypt::sha256(src.key), - media_hash, - created_at: src.created_at, - deleted_at: src.deleted_at, - last_used_at: src.last_used_at, - generated_by: src.generated_by, - hits: src.hits, - scope: src.scope, - }; - - - - let r = tx_try!(AccessToken::replace_with_session(&target.id, &target, &mut session).await); - assert_eq!(r.matched_count, 1); - count += 1; - }; - }); - - info!("{} access tokens modified", count); - info!("Bye!"); - - Ok(()) -} diff --git a/rs/internal-scripts/migrations/src/migration_add_audio_file_order.rs b/rs/internal-scripts/migrations/src/migration_add_audio_file_order.rs deleted file mode 100644 index e5142ff2..00000000 --- a/rs/internal-scripts/migrations/src/migration_add_audio_file_order.rs +++ /dev/null @@ -1,132 +0,0 @@ -// use anyhow::Context; -// use config::Config; -// use db::audio_file::Metadata; -// use log::*; -// use serde::{Deserialize, Serialize}; -// use serde_util::{as_f64, DateTime}; - -#[tokio::main] -async fn main() -> Result<(), anyhow::Error> { - anyhow::bail!("already migrated"); - // shared_init(String::from("./openstream.toml")).await?; - // migrate().await?; - // Ok(()) -} - -// async fn shared_init(config: String) -> Result { -// use owo_colors::*; - -// logger::init(); -// let _ = dotenv::dotenv(); - -// let canonical_config_path = std::fs::canonicalize(config.as_str()) -// .with_context(|| format!("error loading config file from {}", config.yellow()))?; - -// info!( -// "loading config file from {}", -// canonical_config_path.to_string_lossy().yellow() -// ); - -// let config = config::load(config).with_context(|| { -// format!( -// "error loading config file from {}", -// canonical_config_path.to_string_lossy().yellow(), -// ) -// })?; - -// debug!("config loaded: resolved config: {:#?}", config); - -// let client_options = mongodb::options::ClientOptions::parse(config.mongodb.url.as_str()) -// .await -// .context("failed to parse mongodb connection string")?; - -// info!("mongodb config hosts: {:?}", client_options.hosts); -// info!( -// "mongodb client compressors: {:?}", -// client_options.compressors -// ); - -// let client = mongodb::Client::with_options(client_options.clone()) -// .context("failed to create mongodb client")?; - -// if client.default_database().is_none() { -// anyhow::bail!("no database specified in config, under [mongodb] url"); -// } - -// info!("mongodb client created"); - -// db::init(client, config.mongodb.storage_db_name.clone()); - -// info!("ensuring mongodb collections..."); -// db::ensure_collections() -// .await -// .context("error ensuring mongodb collections and indexes")?; - -// Ok(config) -// } - -// #[derive(Debug, Clone, Serialize, Deserialize)] -// #[serde(rename_all = "snake_case")] -// #[macros::keys] -// pub struct PrevAudioFile { -// #[serde(rename = "_id")] -// pub id: String, -// pub station_id: String, -// pub sha256: String, - -// #[serde(with = "as_f64")] -// pub len: u64, - -// pub duration_ms: f64, - -// #[serde(with = "as_f64")] -// pub bytes_sec: usize, - -// #[serde(with = "as_f64")] -// pub chunk_count: usize, - -// #[serde(with = "as_f64")] -// pub chunk_len: usize, - -// pub chunk_duration_ms: f64, - -// pub filename: String, - -// pub metadata: Metadata, - -// pub created_at: DateTime, -// } - -// async fn migrate() -> Result<(), mongodb::error::Error> { -// use db::models::audio_file::AudioFile; -// use db::models::increment_station_audio_file_order::IncrementStationAudioFileOrder; -// use db::Incrementer; -// use db::Model; -// use mongodb::bson::doc; - -// let count = db::run_transaction!(session => { -// let cl = db::models::audio_file::AudioFile::cl_as::(); - -// let mut count: u64 = 0; - -// let mut cursor = tx_try!(cl.find_with_session(None, None, &mut session).await); -// while let Some(result) = cursor.next(&mut session).await { -// let file = tx_try!(result); -// info!("migrating file {} => {} for station {}", count, file.id, file.station_id); -// let order = tx_try!(IncrementStationAudioFileOrder::next_with_session(&file.station_id, &mut session).await); -// let filter = doc!{ db::KEY_ID: &file.id }; -// let update = doc!{ "$set": { AudioFile::KEY_ORDER: order } }; -// let r = tx_try!(cl.update_one_with_session(filter, update, None, &mut session).await); -// assert_eq!(r.matched_count, 1); -// assert_eq!(r.modified_count, 1); -// count += 1; -// }; - -// count -// }); - -// info!("{} files modified", count); -// info!("Bye!"); - -// Ok(()) -// } diff --git a/rs/internal-scripts/migrations/src/migration_update_stream_connections_lite.rs b/rs/internal-scripts/migrations/src/migration_update_stream_connections_lite.rs deleted file mode 100644 index ab66ab5c..00000000 --- a/rs/internal-scripts/migrations/src/migration_update_stream_connections_lite.rs +++ /dev/null @@ -1,102 +0,0 @@ -#![allow(unreachable_code)] - -use anyhow::Context; -use config::Config; -use futures_util::TryStreamExt; -use log::*; -use mongodb::bson::doc; - -#[tokio::main] -async fn main() -> Result<(), anyhow::Error> { - // anyhow::bail!("already migrated"); - shared_init(String::from("./openstream.toml")).await?; - migrate().await?; - Ok(()) -} - -async fn shared_init(config: String) -> Result { - use owo_colors::*; - - logger::init(); - let _ = dotenv::dotenv(); - - let canonical_config_path = std::fs::canonicalize(config.as_str()) - .with_context(|| format!("error loading config file from {}", config.yellow()))?; - - info!( - "loading config file from {}", - canonical_config_path.to_string_lossy().yellow() - ); - - let config = config::load(Some(config)).with_context(|| { - format!( - "error loading config file from {}", - canonical_config_path.to_string_lossy().yellow(), - ) - })?; - - debug!("config loaded: resolved config: {:#?}", config); - - let client_options = mongodb::options::ClientOptions::parse(config.mongodb.url.as_str()) - .await - .context("failed to parse mongodb connection string")?; - - info!("mongodb config hosts: {:?}", client_options.hosts); - info!( - "mongodb client compressors: {:?}", - client_options.compressors - ); - - let client = - mongodb::Client::with_options(client_options).context("failed to create mongodb client")?; - - if client.default_database().is_none() { - anyhow::bail!("no database specified in config, under [mongodb] url"); - } - - info!("mongodb client created"); - - db::init(client, config.mongodb.storage_db_name.clone()); - - // info!("ensuring mongodb collections..."); - // db::ensure_collections() - // .await - // .context("error ensuring mongodb collections and indexes")?; - - Ok(config) -} - -async fn migrate() -> Result<(), mongodb::error::Error> { - use db::stream_connection::lite::StreamConnectionLite; - use db::stream_connection::StreamConnection; - use db::Model; - - db::run_transaction!(session => { - - info!("getting full documents"); - let cursor = tx_try!(StreamConnection::cl().find(doc!{}, None).await); - let full_documents: Vec = tx_try!(cursor.try_collect().await); - - info!("got {} full documents", full_documents.len()); - - info!("removing old documents"); - let r = tx_try!(StreamConnectionLite::cl().delete_many(doc!{}, None).await); - info!("{} old documents deleted", r.deleted_count); - - info!("mapping documents"); - let documents: Vec = full_documents.into_iter().map(StreamConnectionLite::from).collect(); - - info!("{} documents mapped", documents.len()); - - info!("inserting {} documents", documents.len()); - - let r = tx_try!(StreamConnectionLite::cl().insert_many(documents, None).await); - - info!("{} documents inserted", r.inserted_ids.len()); - }); - - info!("OK"); - info!("Bye!"); - - Ok(()) -} diff --git a/rs/internal-scripts/mixed/Cargo.toml b/rs/internal-scripts/mixed/Cargo.toml deleted file mode 100644 index 22f7792a..00000000 --- a/rs/internal-scripts/mixed/Cargo.toml +++ /dev/null @@ -1,13 +0,0 @@ -[package] -name = "mixed" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -bytes = "1.2.1" -ffmpeg = { version = "0.1.0", path = "../../packages/ffmpeg" } -hyper = { version = "0.14.27", features = ["full"] } -prex = { version = "0.1.0", path = "../../packages/prex" } -tokio = { version = "1.29.0", features = ["full"] } diff --git a/rs/internal-scripts/mixed/samples/aac.aac b/rs/internal-scripts/mixed/samples/aac.aac deleted file mode 100644 index 85460baf..00000000 Binary files a/rs/internal-scripts/mixed/samples/aac.aac and /dev/null differ diff --git a/rs/internal-scripts/mixed/samples/mp3.mp3 b/rs/internal-scripts/mixed/samples/mp3.mp3 deleted file mode 100644 index f013a849..00000000 Binary files a/rs/internal-scripts/mixed/samples/mp3.mp3 and /dev/null differ diff --git a/rs/internal-scripts/mixed/samples/ogg.ogg b/rs/internal-scripts/mixed/samples/ogg.ogg deleted file mode 100644 index e81a8e69..00000000 Binary files a/rs/internal-scripts/mixed/samples/ogg.ogg and /dev/null differ diff --git a/rs/internal-scripts/mixed/samples/webm.webm b/rs/internal-scripts/mixed/samples/webm.webm deleted file mode 100644 index 38ae30a8..00000000 Binary files a/rs/internal-scripts/mixed/samples/webm.webm and /dev/null differ diff --git a/rs/internal-scripts/mixed/src/main.rs b/rs/internal-scripts/mixed/src/main.rs deleted file mode 100644 index 85e11656..00000000 --- a/rs/internal-scripts/mixed/src/main.rs +++ /dev/null @@ -1,44 +0,0 @@ -use std::net::SocketAddr; - -use hyper::{header::CONTENT_TYPE, http::HeaderValue, Body, Server, StatusCode}; -use prex::*; - -#[tokio::main] -async fn main() { - let mut app = prex::prex(); - - app.get("/mixed", mixed); - - let app = app.build().unwrap(); - - let addr = SocketAddr::from(([0, 0, 0, 0], 20700)); - - let server = Server::bind(&addr); - - println!("server listening at {addr}"); - - server.serve(app).await.unwrap(); -} - -async fn mixed(_: Request, _: Next) -> Response { - static MP3: &[u8] = include_bytes!("../samples/mp3.mp3"); - static AAC: &[u8] = include_bytes!("../samples/aac.aac"); - //static OGG: &[u8] = include_bytes!("../samples/ogg.ogg"); - //static WEBM: &[u8] = include_bytes!("../samples/webm.webm"); - - let mut body = vec![]; - for _ in 0..1 { - for slice in [AAC, MP3 /* OGG, WEBM*/].into_iter() { - body.extend_from_slice(&slice[17853..]); - } - } - - let mut res = Response::new(StatusCode::OK); - res - .headers_mut() - .append(CONTENT_TYPE, HeaderValue::from_static("audio/mpeg")); - - *res.body_mut() = Body::from(body); - - res -} diff --git a/rs/internal-scripts/producer/Cargo.toml b/rs/internal-scripts/producer/Cargo.toml deleted file mode 100644 index 0402b03b..00000000 --- a/rs/internal-scripts/producer/Cargo.toml +++ /dev/null @@ -1,25 +0,0 @@ -[package] -name = "producer" -version = "0.1.0" -edition = "2021" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -api = { version = "0.1.0", path = "../../packages/api" } -async-stream = "0.3.3" -bytes = "1.2.1" -constants = { version = "0.1.0", path = "../../config/constants" } -db = { version = "0.1.0", path = "../../packages/db" } -dotenv = "0.15.0" -ffmpeg = { version = "0.1.0", path = "../../packages/ffmpeg" } -futures = "0.1.14" -hyper = { version = "0.14.27", features = ["full"] } -multiqueue = "0.3.2" -reqwest = { version = "0.11", default-features = false, features = ["stream", "rustls-tls"] } -serde_json = { version = "1.0", features = ["preserve_order"] } -stream-util = { version = "0.1.0", path = "../../packages/stream-util" } -thiserror = "1.0.38" -tokio = { version = "1.29.0", features = ["full"] } - -[features] diff --git a/rs/internal-scripts/producer/src/main.rs b/rs/internal-scripts/producer/src/main.rs deleted file mode 100644 index 5a0a5696..00000000 --- a/rs/internal-scripts/producer/src/main.rs +++ /dev/null @@ -1,297 +0,0 @@ -use std::{ - str::FromStr, - sync::atomic::{AtomicUsize, Ordering}, -}; - -use bytes::Bytes; -// use ffmpeg::{Ffmpeg, FfmpegConfig, FfmpegSpawn}; -use hyper::{Method, StatusCode}; -use reqwest::{Body, Client, Response, Url}; -// use tokio::io::{AsyncReadExt, AsyncWriteExt}; - -// use multiqueue::{ -// broadcast_fut_queue as channel, -// BroadcastFutReceiver as Receiver, /*BroadcastFutSender as Sender,*/ -// }; - -// use futures::sink::Sink; -// use futures::Future; - -static ACTIVE_COUNT: AtomicUsize = AtomicUsize::new(0); -static TOTAL_COUNT: AtomicUsize = AtomicUsize::new(0); -static ERROR_COUNT: AtomicUsize = AtomicUsize::new(0); -static DATA_WRITTEN: AtomicUsize = AtomicUsize::new(0); - -static AUDIO: Bytes = Bytes::from_static(include_bytes!("../../../../audio/audio.mp3")); - -static TOKEN: &str = "sb8rqnt7nxkx8t8ca3uypg83k87qrw4258cs35s29ekqa4kv"; - -#[derive(Debug, thiserror::Error)] -enum Error { - #[error("hyper: {0}")] - Hyper(#[from] hyper::Error), - #[error("reqwest: {0}")] - Reqwest(#[from] reqwest::Error), -} - -#[tokio::main] -async fn main() { - dotenv::dotenv().expect("dotenv"); - - let s = std::env::var("S") - .expect("S env not present") - .parse::() - .expect("invalid S env"); - let d = std::env::var("D") - .unwrap_or_else(|_| String::from("1000")) - .parse::() - .expect("invalid D env"); - - if s == 0 { - panic!("S env cant be 0"); - } - - // let (tx, rx) = channel::(1024); - - // let config = FfmpegConfig { - // copycodec: true, - // readrate: true, - // ..Default::default() - // }; - - // let spawn = Ffmpeg::new(config).spawn().expect("ffmpeg spawn"); - - // let FfmpegSpawn { - // mut stdin, - // mut stdout, - // mut stderr, - // mut child, - // .. - // } = spawn; - - // let future_stderr = async move { - // let mut buf = String::new(); - // stderr - // .read_to_string(&mut buf) - // .await - // .expect("ffmpeg stderr read to string"); - // buf - // }; - - // let future_stdin = async move { - // loop { - // stdin.write_all(AUDIO.as_ref()).await.expect("ffmpeg write"); - // } - // }; - - // let future_stdout = { - // let tx = tx.clone(); - // async move { - // let mut buf = [0; 256]; - // loop { - // stdout.read_exact(&mut buf).await.expect("ffmpeg read"); - // let bytes = Bytes::copy_from_slice(&buf); - // tx.send(bytes).await.expect("tx send"); - // } - // } - // }; - - // let future_exit = async move { child.wait().await.expect("ffmpeg wait") }; - - // tokio::spawn(future_stdin); - // tokio::spawn(future_stdout); - // tokio::spawn(async move { - // let (exit, stderr) = tokio::join!(future_exit, future_stderr); - // eprintln!("ffmpeg exit: {exit:?}, stderr: {stderr}"); - // panic!("ffmpeg exit"); - // }); - - println!("logger"); - tokio::spawn(logger()); - - let client = reqwest::Client::new(); - - let delay = tokio::time::Duration::from_millis(d); - - for i in 0..s { - tokio::time::sleep(delay).await; - producer(client.clone(), format!("test{}", i + 1)); - } - - // drop(rx); - - tokio::time::sleep(std::time::Duration::from_secs(u64::MAX)).await; -} - -fn producer(client: reqwest::Client, id: String) -> tokio::task::JoinHandle<()> { - tokio::spawn(async move { - let active = ACTIVE_COUNT.fetch_add(1, Ordering::SeqCst) + 1; - let total = TOTAL_COUNT.fetch_add(1, Ordering::SeqCst) + 1; - let errors = ERROR_COUNT.load(Ordering::SeqCst); - - eprintln!("start | active: {active}, total: {total}, errors: {errors}"); - - #[allow(clippy::expect_fun_call)] - let password = get_source_password(&client, &id) - .await - .expect(&format!("get password for {id}")); - - // let stream = async_stream::stream! { - // loop { - // let data = rx.recv().await.expect("rx recv"); - // let len = data.len(); - // yield Ok::(data); - // DATA_WRITTEN.fetch_add(len, Ordering::SeqCst); - // }; - // }; - // let body = Body::wrap_stream(stream); - - let (mut body_sender, body) = hyper::Body::channel(); - - tokio::spawn(async move { - loop { - for data in AUDIO.chunks(256) { - let len = data.len(); - body_sender - .send_data(Bytes::copy_from_slice(data)) - .await - .expect("body send"); - DATA_WRITTEN.fetch_add(len, Ordering::SeqCst); - } - } - }); - - let body = Body::from(body); - - let method = Method::from_str("SOURCE").unwrap(); - - let url = Url::parse(&format!( - "http://source.local.openstream.fm:20600/{id}/source" - )) - .unwrap(); - - let response = client - .request(method, url) - .basic_auth("source", Some(password)) - .body(body) - .send() - .await; - - let active = ACTIVE_COUNT.fetch_sub(1, Ordering::SeqCst) - 1; - let total = TOTAL_COUNT.load(Ordering::SeqCst); - - match response { - Ok(_) => { - eprintln!("end | active: {active}, total: {total}, errors: {errors}"); - } - Err(e) => { - let errors = ERROR_COUNT.fetch_add(1, Ordering::SeqCst) - 1; - eprintln!("error: {e} => {e:?}"); - eprintln!("error | active: {active}, total: {total}, errors: {errors}"); - } - }; - - producer(client, id); - }) -} - -#[derive(Debug, thiserror::Error)] -enum GetPasswordError { - #[error("reqwest fetch: {0}")] - Fetch(#[source] reqwest::Error), - #[error("reqwest body: {0}")] - Body(#[source] reqwest::Error), - #[error("status not ok: {:?}", response.status())] - Status { response: Response }, - #[error("response json: {error}, status: {status:?}")] - Json { - status: StatusCode, - body: Bytes, - #[source] - error: serde_json::Error, - }, -} - -async fn get_source_password( - client: &Client, - station_id: &str, -) -> Result { - let url = Url::parse(&format!( - "https://api.local.openstream.fm/stations/{station_id}" - )) - .unwrap(); - let res = client - .get(url) - .header(constants::ACCESS_TOKEN_HEADER, TOKEN) - .send() - .await - .map_err(GetPasswordError::Fetch)?; - - if !res.status().is_success() { - return Err(GetPasswordError::Status { response: res }); - }; - - let status = res.status(); - let body = res.bytes().await.map_err(GetPasswordError::Body)?; - - let output: api::routes::stations::id::get::Output = match serde_json::from_slice(&body) { - Ok(out) => out, - Err(error) => { - return Err(GetPasswordError::Json { - status, - body, - error, - }) - } - }; - - match output.station { - db::station::PublicStation::Admin(station) => Ok(station.0.source_password), - db::station::PublicStation::User(station) => Ok(station.source_password), - } -} - -async fn logger() { - let mut prev_written = DATA_WRITTEN.load(Ordering::SeqCst); - let mut interval = tokio::time::interval(tokio::time::Duration::from_secs(1)); - loop { - interval.tick().await; - let written = DATA_WRITTEN.load(Ordering::SeqCst); - let tick_written = written - prev_written; - prev_written = written; - let stations = ACTIVE_COUNT.load(Ordering::SeqCst); - let per_station = tick_written / if stations == 0 { 1 } else { stations }; - println!( - "data: {} / s - {} per station ({})", - B(tick_written), - B(per_station), - stations - ) - } -} - -#[derive(Debug, Clone, Copy, Eq, PartialEq)] -struct B(pub usize); - -impl std::fmt::Display for B { - fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { - const K: usize = 1_000; - const M: usize = 1_000_000; - const G: usize = 1_000_000_000; - - let b = self.0; - - if b < K { - write!(f, "{b} B") - } else if b < M { - let k = b as f64 / K as f64; - write!(f, "{:.2} KB", k) - } else if b < G { - let m = b as f64 / M as f64; - write!(f, "{:.2} MB", m) - } else { - let g = b as f64 / G as f64; - write!(f, "{:.2} GB", g) - } - } -} diff --git a/rs/internal-scripts/purge-deleted-items/Cargo.toml b/rs/internal-scripts/purge-deleted-items/Cargo.toml deleted file mode 100644 index b3470e4d..00000000 --- a/rs/internal-scripts/purge-deleted-items/Cargo.toml +++ /dev/null @@ -1,26 +0,0 @@ -[package] -name = "purge-deleted-items" -version = "0.1.0" -edition = "2021" - -[[bin]] -name = "purge-deleted-items" -path = "main.rs" - -# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html - -[dependencies] -anyhow = "1.0.68" -config = { version = "0.1.0", path = "../../packages/config" } -crypt = { version = "0.1.0", path = "../../packages/crypt" } -db = { version = "0.1.0", path = "../../packages/db" } -dotenv = "0.15.0" -futures-util = "0.3.25" -log = "0.4.17" -logger = { version = "0.1.0", path = "../../packages/logger" } -macros = { version = "0.1.0", path = "../../packages/macros" } -mongodb = "2.7.0" -owo-colors = { version = "3.5.0", path = "../../packages/owo-colors" } -serde = { version = "1.0.152", features = ["derive"] } -serde-util = { version = "0.1.0", path = "../../packages/serde-util" } -tokio = { version = "1.29.0", features = ["full"] } diff --git a/rs/internal-scripts/purge-deleted-items/main.rs b/rs/internal-scripts/purge-deleted-items/main.rs deleted file mode 100644 index 1844a4d7..00000000 --- a/rs/internal-scripts/purge-deleted-items/main.rs +++ /dev/null @@ -1,332 +0,0 @@ -#![allow(unreachable_code)] - -use anyhow::Context; -use db::{ - access_token::AccessToken, account::Account, account_invitations::AccountInvitation, - admin::Admin, audio_chunk::AudioChunk, audio_file::AudioFile, - audio_upload_operation::AudioUploadOperation, email_verification_code::EmailVerificationCode, - media_session::MediaSession, play_history_item::PlayHistoryItem, relay_session::RelaySession, - sent_email::SentEmail, station::Station, - station_files_pre_shuffle_checkpoint::StationFilesPreShuffleCheckpoint, - station_picture::StationPicture, station_picture_variant::StationPictureVariant, - token_user_recovery::TokenUserRecovery, user::User, user_account_relation::UserAccountRelation, -}; -use log::*; -use mongodb::bson::doc; -use std::collections::HashMap; - -#[tokio::main] -async fn main() -> Result<(), anyhow::Error> { - shared_init().await?; - purge().await?; - Ok(()) -} - -async fn shared_init() -> Result<(), anyhow::Error> { - logger::init(); - let _ = dotenv::dotenv(); - - let client_options = mongodb::options::ClientOptions::parse( - &std::env::var("OPENSTREAM_PURGE_MONGO_URL") - .context("env.OPENSTREAM_PURGE_MONGO_URL is required")?, - ) - .await - .context("failed to parse mongodb connection string")?; - - info!("mongodb config hosts: {:?}", client_options.hosts); - info!( - "mongodb client compressors: {:?}", - client_options.compressors - ); - - let client = - mongodb::Client::with_options(client_options).context("failed to create mongodb client")?; - - if client.default_database().is_none() { - anyhow::bail!("no database specified in config, under [mongodb] url"); - } - - info!("mongodb client created"); - - db::init( - client, - Some( - std::env::var("OPENSTREAM_PURGE_STORAGE_DB_NAME") - .context("env.OPENSTREAM_PURGE_STORAGE_DB_NAME is required")?, - ), - ); - - // info!("ensuring mongodb collections..."); - // db::ensure_collections() - // .await - // .context("error ensuring mongodb collections and indexes")?; - - // Ok(config) - Ok(()) -} - -async fn purge() -> Result<(), mongodb::error::Error> { - use db::stream_connection::lite::StreamConnectionLite; - use db::stream_connection::StreamConnection; - use db::Model; - - db::run_transaction!(session => { - macro_rules! get_all { - ($ty:ty) => {{ - let mut cursor = tx_try!(<$ty>::cl().find_with_session(None, None, &mut session).await); - let mut docs = Vec::<$ty>::new(); - let mut index = HashMap::::new(); - while let Some(doc) = tx_try!(cursor.next(&mut session).await.transpose()) { - index.insert(doc.id.clone(), doc.clone()); - docs.push(doc); - } - info!("{} {}", docs.len(), stringify!($ty)); - (docs, index) - }} - } - - macro_rules! split { - ($ident:ident) => {{ - let mut current_docs = vec![]; - let mut current_index = HashMap::new(); - let mut deleted_docs = vec![]; - let mut deleted_index = HashMap::new(); - for item in $ident.iter() { - if item.deleted_at.is_none() { - current_docs.push(item.clone()); - current_index.insert(item.id.clone(), item.clone()); - } else { - deleted_docs.push(item.clone()); - deleted_index.insert(item.id.clone(), item.clone()); - } - } - - info!( - "{} current {} - {} deleted {}", - current_docs.len(), - stringify!($ident), - deleted_docs.len(), - stringify!($ident), - ); - - (current_docs, current_index, deleted_docs, deleted_index) - }} - } - - macro_rules! delete_ids { - ($ty:ty, $docs:ident) => {{ - let ids = $docs.iter().map(|doc| doc.id.clone()).collect::>(); - let filter = doc!{ "_id": { "$in": ids } }; - let r = tx_try!(<$ty>::cl().delete_many_with_session(filter, None, &mut session).await); - info!( - "{} {} deleted", - r.deleted_count, - stringify!($ty) - ); - }} - } - - info!("purge transaction started"); - - let (admins, _admins_index) = get_all!(Admin); - let (users, _users_index) = get_all!(User); - let (accounts, _accounts_index) = get_all!(Account); - let (stations, _stations_index) = get_all!(Station); - let (station_pictures, _station_pictures_index) = get_all!(StationPicture); - - //let (user_account_relations, user_account_relations_index) = get_all!(UserAccountRelation); - //let (access_tokens, access_tokens_index) = get_all!(AccessToken); - //let (audio_files, audio_files_index) = get_all!(AudioFile); - //let (audio_chunks, audio_chunks_index) = get_all!(AudioChunk); - //let (account_invitations, account_invitations_index) = get_all!(AccountInvitation); - //let (play_history_items, play_history_items_index) = get_all!(PlayHistoryItem); - //let (stream_connections, stream_connections_index) = get_all!(StreamConnection); - //let (stream_connections_lite, stream_connections_lite_index) = get_all!(StreamConnectionLite); - //let (token_user_recoverys, token_user_recoverys_item) = get_all!(TokenUserRecovery); - //let (email_verification_code, email_verification_code_index) = get_all!(EmailVerificationCode); - //let (sent_email, sent_email_index) = get_all!(SentEmail); - //let (pre_shuffly_checkpoints, pre_shuffle_checkpoints_index) = get_all!(StationFilesPreShuffleCheckpoint); - //let (media_sessions, media_sessions_index) = get_all!(MediaSession); - //let (relay_sessions, relay_sessions_index) = get_all!(RelaySession); - - info!("== GET stage ended =="); - - let ( - _current_admins, - _current_admins_index, - deleted_admins, - _deleted_admins_index, - ) = split!(admins); - delete_ids!(Admin, deleted_admins); - - - let ( - current_users, - _current_users_index, - deleted_users, - _deleted_users_index, - ) = split!(users); - delete_ids!(User, deleted_users); - - let ( - current_accounts, - current_accounts_index, - deleted_accounts, - _deleted_accounts_index, - ) = split!(accounts); - delete_ids!(Account, deleted_accounts); - - let ( - current_stations, - _current_stations_index, - deleted_stations, - _deleted_stations_index, - ) = split!(stations); - - info!("== SPLIT stage ended =="); - - let mut to_delete_stations = deleted_stations.clone(); - let mut current_station_ids = vec![]; - - for item in current_stations { - if !current_accounts_index.contains_key(&item.account_id) { - info!("add to delete station => {} {} {}", item.id, item.name, item.account_id); - to_delete_stations.push(item.clone()); - } else { - current_station_ids.push(item.id.clone()); - } - } - - delete_ids!(Station, to_delete_stations); - - - let r = tx_try!(AccessToken::cl().delete_many_with_session(doc!{}, None, &mut session).await); - info!("{} (all) access tokens deleted", r.deleted_count); - - let current_account_ids = current_accounts.iter().map(|item| item.id.clone()).collect::>(); - let current_user_ids = current_users.iter().map(|item| item.id.clone()).collect::>(); - - // User Account Relationss - { - let filter = doc!{ - "$or": [ - { UserAccountRelation::KEY_ACCOUNT_ID: { "$nin": ¤t_account_ids } }, - { UserAccountRelation::KEY_USER_ID: { "$nin": ¤t_user_ids } } - ] - }; - - let r = tx_try!(UserAccountRelation::cl().delete_many_with_session(filter, None, &mut session).await); - info!("{} user account relations deleted", r.deleted_count); - } - - // Audio files - { - let filter = doc! { - AudioFile::KEY_STATION_ID: { "$nin": ¤t_station_ids } - }; - - let r = tx_try!(AudioFile::cl().delete_many_with_session(filter, None, &mut session).await); - info!("{} audio files deleted", r.deleted_count); - } - - // Audio files - { - let filter = doc! { - AudioChunk::KEY_STATION_ID: { "$nin": ¤t_station_ids } - }; - - let r = tx_try!(AudioChunk::cl().delete_many_with_session(filter, None, &mut session).await); - info!("{} audio chunks deleted", r.deleted_count); - } - - // Play History Items - { - let filter = doc! { - PlayHistoryItem::KEY_STATION_ID: { "$nin": ¤t_station_ids } - }; - - let r = tx_try!(PlayHistoryItem::cl().delete_many_with_session(filter, None, &mut session).await); - info!("{} audio chunks deleted", r.deleted_count); - } - - // Station pictures - let mut current_station_pictures_ids = vec![]; - let mut station_pictures_to_delete = vec![]; - for item in station_pictures { - if current_account_ids.contains(&item.account_id) { - current_station_pictures_ids.push(item.id.clone()); - } else { - station_pictures_to_delete.push(item.clone()); - } - } - - delete_ids!(StationPicture, station_pictures_to_delete); - - // Station pictures variants - { - let filter = doc!{ StationPictureVariant::KEY_PICTURE_ID: { "$nin": ¤t_station_pictures_ids } }; - let r = tx_try!(StationPictureVariant::cl().delete_many_with_session(filter, None, &mut session).await); - info!("{} station picture variants deleted", r.deleted_count); - } - - { - let filter = doc!{ AccountInvitation::KEY_ACCOUNT_ID: { "$nin": ¤t_account_ids } }; - let r = tx_try!(AccountInvitation::cl().delete_many_with_session(filter, None, &mut session).await); - info!("{} account invitations deleted", r.deleted_count); - } - - { - let filter = doc!{ StreamConnection::KEY_STATION_ID: { "$nin": ¤t_station_ids } }; - let r = tx_try!(StreamConnection::cl().delete_many_with_session(filter, None, &mut session).await); - info!("{} stream connections deleted", r.deleted_count); - } - - { - let filter = doc!{ StreamConnectionLite::KEY_STATION_ID: { "$nin": ¤t_station_ids } }; - let r = tx_try!(StreamConnectionLite::cl().delete_many_with_session(filter, None, &mut session).await); - info!("{} stream connections lite deleted", r.deleted_count); - } - - { - let r = tx_try!(AudioUploadOperation::cl().delete_many_with_session(doc!{}, None, &mut session).await); - info!("{} (all) upload operations deleted", r.deleted_count); - } - - { - let r = tx_try!(TokenUserRecovery::cl().delete_many_with_session(doc!{}, None, &mut session).await); - info!("{} (all) token user recoveries deleted", r.deleted_count); - } - - { - let r = tx_try!(StationFilesPreShuffleCheckpoint::cl().delete_many_with_session(doc!{}, None, &mut session).await); - info!("{} (all) shuffle checkpoints deleted", r.deleted_count); - } - - { - let r = tx_try!(EmailVerificationCode::cl().delete_many_with_session(doc!{}, None, &mut session).await); - info!("{} (all) email verification codes deleted", r.deleted_count); - } - - { - let r = tx_try!(SentEmail::cl().delete_many_with_session(doc!{}, None, &mut session).await); - info!("{} (all) sent emails deleted", r.deleted_count); - } - - { - let filter = doc! { MediaSession::KEY_STATION_ID: { "$nin": ¤t_station_ids } }; - let r = tx_try!(MediaSession::cl().delete_many_with_session(filter, None, &mut session).await); - info!("{} media sessions deleted", r.deleted_count); - } - - { - let filter = doc! { MediaSession::KEY_STATION_ID: { "$nin": ¤t_station_ids } }; - let r = tx_try!(RelaySession::cl().delete_many_with_session(filter, None, &mut session).await); - info!("{} relay sessions deleted", r.deleted_count); - } - - //session.abort_transaction().await?; - //info!("transaction aborted"); - }); - - Ok(()) -} diff --git a/serve-docs.sh b/serve-docs.sh deleted file mode 100755 index 3358aded..00000000 --- a/serve-docs.sh +++ /dev/null @@ -1,2 +0,0 @@ -#/bin/bash -serve-directory -l -p 8000 ./target/doc \ No newline at end of file