Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

refactor: reintroduce single device simulator as an uplink built-in #274

Merged
merged 4 commits into from
Sep 4, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
79 changes: 78 additions & 1 deletion Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

31 changes: 8 additions & 23 deletions simulator.sh
Original file line number Diff line number Diff line change
Expand Up @@ -6,27 +6,21 @@ start_devices() {
kill_devices;
mkdir -p devices

echo "Starting uplink and simulator"
echo "Starting uplink with simulator"
devices=($(seq $start $stop))
first=${devices[0]}
rest=${devices[@]:1}
printf -v port "5%04d" $first
download_auth_config $first
create_uplink_config $first $port
create_uplink_config $first
start_uplink 1 $first "-vv" "devices/uplink_$first.log"

sleep 1
start_simulator 1 $first $port "-vv" "devices/simulator_$first.log"

for id in $rest
do
printf -v port "5%04d" $id
sleep 1

download_auth_config $id
create_uplink_config $id $port
create_uplink_config $id
start_uplink 0 $id

sleep 1
start_simulator 0 $id $port
done
echo DONE

Expand All @@ -39,18 +33,17 @@ start_devices() {

create_uplink_config() {
id=${1:?"Missing id"}
port=${2:?"Missing port number"}
printf "$(cat << EOF
processes = []
action_redirections = { send_file = \"load_file\", update_firmware = \"install_firmware\" }
persistence_path = \"/var/tmp/persistence/$id\"

[persistence]
path = \"/var/tmp/persistence/$id\"
max_file_size = 104857600
max_file_count = 3

[tcpapps.1]
port = $port
[simulator]
gps_paths = "./paths"
actions= [{ name = \"load_file\" }, { name = \"install_firmware\" }, { name = \"update_config\" }, { name = \"unlock\" }, { name = \"lock\" }]

[downloader]
Expand Down Expand Up @@ -88,14 +81,6 @@ start_uplink() {
echo $! >> "devices/$2.pid"
}

start_simulator() {
id=${2:?"Missing id"}
port=${3:?"Missing port number"}
cmd="simulator -p $port -g ./paths"
run $1 "$cmd" "$4" "$5"
# simulator runs only as long as associated uplink instance runs and need not be tracked
}

kill_devices() {
echo "Killing all devices in pids file"
for file in $(find ./devices -type f -name "*.pid")
Expand Down
1 change: 1 addition & 0 deletions uplink/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ tracing-subscriber = { version="=0.3.14", features=["env-filter"] }
tokio-compat-02 = "0.2.0"
tunshell-client = { git = "https://github.com/bytebeamio/tunshell.git", branch = "android_patch" }
# simulator
fake = { version = "2.5.0", features = ["derive"] }
rand = "0.8"
# downloader
futures-util = "0.3"
Expand Down
6 changes: 0 additions & 6 deletions uplink/src/base/actions.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,6 @@ use super::clock;
/// said device, in this case, uplink.
#[derive(Debug, Clone, Serialize, Deserialize)]
pub struct Action {
#[serde(skip)]
pub device_id: Option<String>,
// action id
#[serde(alias = "id")]
pub action_id: String,
Expand All @@ -27,8 +25,6 @@ pub struct Action {
pub struct ActionResponse {
#[serde(alias = "id")]
pub action_id: String,
#[serde(skip)]
pub device_id: Option<String>,
// sequence number
pub sequence: u32,
// timestamp
Expand All @@ -49,7 +45,6 @@ impl ActionResponse {

ActionResponse {
action_id: id.to_owned(),
device_id: None,
sequence: 0,
timestamp,
state: state.to_owned(),
Expand Down Expand Up @@ -113,7 +108,6 @@ impl From<&ActionResponse> for Payload {
fn from(resp: &ActionResponse) -> Self {
Self {
stream: "action_status".to_owned(),
device_id: resp.device_id.to_owned(),
sequence: resp.sequence,
timestamp: resp.timestamp,
payload: json!({
Expand Down
12 changes: 0 additions & 12 deletions uplink/src/base/bridge/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -68,8 +68,6 @@ pub trait Package: Send + Debug {
pub struct Payload {
#[serde(skip_serializing)]
pub stream: String,
#[serde(skip)]
pub device_id: Option<String>,
pub sequence: u32,
pub timestamp: u64,
#[serde(flatten)]
Expand Down Expand Up @@ -608,7 +606,6 @@ mod tests {
std::thread::sleep(Duration::from_secs(1));

let action_1 = Action {
device_id: None,
action_id: "1".to_string(),
kind: "test".to_string(),
name: "route_1".to_string(),
Expand All @@ -630,7 +627,6 @@ mod tests {
assert_eq!(elapsed / 1000, 10);

let action_2 = Action {
device_id: None,
action_id: "2".to_string(),
kind: "test".to_string(),
name: "route_2".to_string(),
Expand Down Expand Up @@ -668,7 +664,6 @@ mod tests {
std::thread::sleep(Duration::from_secs(1));

let action_1 = Action {
device_id: None,
action_id: "1".to_string(),
kind: "test".to_string(),
name: "test".to_string(),
Expand All @@ -681,7 +676,6 @@ mod tests {
assert_eq!(status.state, "Received".to_owned());

let action_2 = Action {
device_id: None,
action_id: "2".to_string(),
kind: "test".to_string(),
name: "test".to_string(),
Expand Down Expand Up @@ -715,7 +709,6 @@ mod tests {
std::thread::sleep(Duration::from_secs(1));

let action = Action {
device_id: None,
action_id: "1".to_string(),
kind: "test".to_string(),
name: "test".to_string(),
Expand Down Expand Up @@ -768,7 +761,6 @@ mod tests {
std::thread::sleep(Duration::from_secs(1));

let action = Action {
device_id: None,
action_id: "1".to_string(),
kind: "test".to_string(),
name: "test".to_string(),
Expand Down Expand Up @@ -826,7 +818,6 @@ mod tests {
std::thread::sleep(Duration::from_secs(1));

let action = Action {
device_id: None,
action_id: "1".to_string(),
kind: "tunshell".to_string(),
name: "launch_shell".to_string(),
Expand All @@ -837,7 +828,6 @@ mod tests {
std::thread::sleep(Duration::from_secs(1));

let action = Action {
device_id: None,
action_id: "2".to_string(),
kind: "test".to_string(),
name: "test".to_string(),
Expand Down Expand Up @@ -905,7 +895,6 @@ mod tests {
std::thread::sleep(Duration::from_secs(1));

let action = Action {
device_id: None,
action_id: "1".to_string(),
kind: "test".to_string(),
name: "test".to_string(),
Expand All @@ -916,7 +905,6 @@ mod tests {
std::thread::sleep(Duration::from_secs(1));

let action = Action {
device_id: None,
action_id: "2".to_string(),
kind: "tunshell".to_string(),
name: "launch_shell".to_string(),
Expand Down
22 changes: 9 additions & 13 deletions uplink/src/base/bridge/streams.rs
Original file line number Diff line number Diff line change
Expand Up @@ -47,29 +47,25 @@ impl Streams {
}

pub async fn forward(&mut self, data: Payload) {
let stream_name = &data.stream;
let (stream_id, device_id) = match &data.device_id {
Some(device_id) => (stream_name.to_owned() + "/" + device_id, device_id.to_owned()),
_ => (stream_name.to_owned(), self.config.device_id.to_owned()),
};
let stream_name = data.stream.to_owned();

let stream = match self.map.get_mut(&stream_id) {
let stream = match self.map.get_mut(&stream_name) {
Some(partition) => partition,
None => {
if self.config.simulator.is_none() && self.map.keys().len() > 20 {
error!("Failed to create {:?} stream. More than max 20 streams", stream_id);
error!("Failed to create {:?} stream. More than max 20 streams", stream_name);
return;
}

let stream = Stream::dynamic(
stream_name,
&stream_name,
&self.config.project_id,
&device_id,
&self.config.device_id,
MAX_BUFFER_SIZE,
self.data_tx.clone(),
);

self.map.entry(stream_id.to_owned()).or_insert(stream)
self.map.entry(stream_name.to_owned()).or_insert(stream)
}
};

Expand All @@ -87,10 +83,10 @@ impl Streams {
// Warn in case stream flushed stream was not in the queue.
if max_stream_size > 1 {
match state {
StreamStatus::Flushed => self.stream_timeouts.remove(&stream_id),
StreamStatus::Flushed => self.stream_timeouts.remove(&stream_name),
StreamStatus::Init(flush_period) => {
trace!("Initialized stream buffer for {stream_id}");
self.stream_timeouts.insert(&stream_id, flush_period);
trace!("Initialized stream buffer for {stream_name}");
self.stream_timeouts.insert(&stream_name, flush_period);
}
StreamStatus::Partial(_l) => {}
}
Expand Down
Loading
Loading