Skip to content

Commit

Permalink
Merge remote-tracking branch 'upstream/main'
Browse files Browse the repository at this point in the history
  • Loading branch information
dabreadman committed Oct 19, 2023
2 parents 4fc5dc1 + 8968dc8 commit 18a2525
Show file tree
Hide file tree
Showing 66 changed files with 1,185 additions and 191 deletions.
4 changes: 4 additions & 0 deletions build/lib/i18n.resources.json
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,10 @@
"name": "vs/workbench/services/assignment",
"project": "vscode-workbench"
},
{
"name": "vs/workbench/services/auxiliaryWindow",
"project": "vscode-workbench"
},
{
"name": "vs/workbench/contrib/extensions",
"project": "vscode-workbench"
Expand Down
146 changes: 129 additions & 17 deletions cli/src/tunnels/control_server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ use crate::util::http::{
};
use crate::util::io::SilentCopyProgress;
use crate::util::is_integrated_cli;
use crate::util::machine::kill_pid;
use crate::util::os::os_release;
use crate::util::sync::{new_barrier, Barrier, BarrierOpener};

Expand All @@ -29,6 +30,7 @@ use futures::FutureExt;
use opentelemetry::trace::SpanKind;
use opentelemetry::KeyValue;
use std::collections::HashMap;
use std::path::PathBuf;
use std::process::Stdio;
use tokio::pin;
use tokio::process::{ChildStderr, ChildStdin};
Expand All @@ -51,9 +53,10 @@ use super::port_forwarder::{PortForwarding, PortForwardingProcessor};
use super::protocol::{
AcquireCliParams, CallServerHttpParams, CallServerHttpResult, ChallengeIssueParams,
ChallengeIssueResponse, ChallengeVerifyParams, ClientRequestMethod, EmptyObject, ForwardParams,
ForwardResult, FsStatRequest, FsStatResponse, GetEnvResponse, GetHostnameResponse,
HttpBodyParams, HttpHeadersParams, ServeParams, ServerLog, ServerMessageParams, SpawnParams,
SpawnResult, ToClientRequest, UnforwardParams, UpdateParams, UpdateResult, VersionResponse,
ForwardResult, FsReadDirEntry, FsReadDirResponse, FsRenameRequest, FsSinglePathRequest,
FsStatResponse, GetEnvResponse, GetHostnameResponse, HttpBodyParams, HttpHeadersParams,
ServeParams, ServerLog, ServerMessageParams, SpawnParams, SpawnResult, SysKillRequest,
SysKillResponse, ToClientRequest, UnforwardParams, UpdateParams, UpdateResult, VersionResponse,
METHOD_CHALLENGE_VERIFY,
};
use super::server_bridge::ServerBridge;
Expand Down Expand Up @@ -306,10 +309,54 @@ fn make_socket_rpc(

rpc.register_sync("ping", |_: EmptyObject, _| Ok(EmptyObject {}));
rpc.register_sync("gethostname", |_: EmptyObject, _| handle_get_hostname());
rpc.register_sync("fs_stat", |p: FsStatRequest, c| {
rpc.register_sync("sys_kill", |p: SysKillRequest, c| {
ensure_auth(&c.auth_state)?;
handle_sys_kill(p.pid)
});
rpc.register_sync("fs_stat", |p: FsSinglePathRequest, c| {
ensure_auth(&c.auth_state)?;
handle_stat(p.path)
});
rpc.register_duplex(
"fs_read",
1,
move |mut streams, p: FsSinglePathRequest, c| async move {
ensure_auth(&c.auth_state)?;
handle_fs_read(streams.remove(0), p.path).await
},
);
rpc.register_duplex(
"fs_write",
1,
move |mut streams, p: FsSinglePathRequest, c| async move {
ensure_auth(&c.auth_state)?;
handle_fs_write(streams.remove(0), p.path).await
},
);
rpc.register_duplex(
"fs_connect",
1,
move |mut streams, p: FsSinglePathRequest, c| async move {
ensure_auth(&c.auth_state)?;
handle_fs_connect(streams.remove(0), p.path).await
},
);
rpc.register_async("fs_rm", move |p: FsSinglePathRequest, c| async move {
ensure_auth(&c.auth_state)?;
handle_fs_remove(p.path).await
});
rpc.register_sync("fs_mkdirp", |p: FsSinglePathRequest, c| {
ensure_auth(&c.auth_state)?;
handle_fs_mkdirp(p.path)
});
rpc.register_sync("fs_rename", |p: FsRenameRequest, c| {
ensure_auth(&c.auth_state)?;
handle_fs_rename(p.from_path, p.to_path)
});
rpc.register_sync("fs_readdir", |p: FsSinglePathRequest, c| {
ensure_auth(&c.auth_state)?;
handle_fs_readdir(p.path)
});
rpc.register_sync("get_env", |_: EmptyObject, c| {
ensure_auth(&c.auth_state)?;
handle_get_env()
Expand Down Expand Up @@ -820,16 +867,87 @@ fn handle_stat(path: String) -> Result<FsStatResponse, AnyError> {
.map(|m| FsStatResponse {
exists: true,
size: Some(m.len()),
kind: Some(match m.file_type() {
t if t.is_dir() => "dir",
t if t.is_file() => "file",
t if t.is_symlink() => "link",
_ => "unknown",
}),
kind: Some(m.file_type().into()),
})
.unwrap_or_default())
}

async fn handle_fs_read(mut out: DuplexStream, path: String) -> Result<EmptyObject, AnyError> {
let mut f = tokio::fs::File::open(path)
.await
.map_err(|e| wrap(e, "file not found"))?;

tokio::io::copy(&mut f, &mut out)
.await
.map_err(|e| wrap(e, "error reading file"))?;

Ok(EmptyObject {})
}

async fn handle_fs_write(mut input: DuplexStream, path: String) -> Result<EmptyObject, AnyError> {
let mut f = tokio::fs::File::create(path)
.await
.map_err(|e| wrap(e, "file not found"))?;

tokio::io::copy(&mut input, &mut f)
.await
.map_err(|e| wrap(e, "error writing file"))?;

Ok(EmptyObject {})
}

async fn handle_fs_connect(
mut stream: DuplexStream,
path: String,
) -> Result<EmptyObject, AnyError> {
let mut s = get_socket_rw_stream(&PathBuf::from(path))
.await
.map_err(|e| wrap(e, "could not connect to socket"))?;

tokio::io::copy_bidirectional(&mut stream, &mut s)
.await
.map_err(|e| wrap(e, "error copying stream data"))?;

Ok(EmptyObject {})
}

async fn handle_fs_remove(path: String) -> Result<EmptyObject, AnyError> {
tokio::fs::remove_dir_all(path)
.await
.map_err(|e| wrap(e, "error removing directory"))?;
Ok(EmptyObject {})
}

fn handle_fs_rename(from_path: String, to_path: String) -> Result<EmptyObject, AnyError> {
std::fs::rename(from_path, to_path).map_err(|e| wrap(e, "error renaming"))?;
Ok(EmptyObject {})
}

fn handle_fs_mkdirp(path: String) -> Result<EmptyObject, AnyError> {
std::fs::create_dir_all(path).map_err(|e| wrap(e, "error creating directory"))?;
Ok(EmptyObject {})
}

fn handle_fs_readdir(path: String) -> Result<FsReadDirResponse, AnyError> {
let mut entries = std::fs::read_dir(path).map_err(|e| wrap(e, "error listing directory"))?;

let mut contents = Vec::new();
while let Some(Ok(child)) = entries.next() {
contents.push(FsReadDirEntry {
name: child.file_name().to_string_lossy().into_owned(),
kind: child.file_type().ok().map(|v| v.into()),
});
}

Ok(FsReadDirResponse { contents })
}

fn handle_sys_kill(pid: u32) -> Result<SysKillResponse, AnyError> {
Ok(SysKillResponse {
success: kill_pid(pid),
})
}

fn handle_get_env() -> Result<GetEnvResponse, AnyError> {
Ok(GetEnvResponse {
env: std::env::vars().collect(),
Expand Down Expand Up @@ -1110,13 +1228,7 @@ async fn wait_for_process_exit(
mut process: tokio::process::Child,
futs: FuturesUnordered<std::pin::Pin<Box<TokioCopyFuture>>>,
) -> Result<SpawnResult, AnyError> {
let closed = process.wait();
pin!(closed);

let r = tokio::select! {
_ = futures::future::join_all(futs) => closed.await,
r = &mut closed => r
};
let (_, r) = tokio::join!(futures::future::join_all(futs), process.wait());

let r = match r {
Ok(e) => SpawnResult {
Expand Down
67 changes: 65 additions & 2 deletions cli/src/tunnels/protocol.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,17 +133,80 @@ pub struct GetEnvResponse {
pub os_release: String,
}

/// Method: `kill`. Sends a generic, platform-specific kill command to the process.
#[derive(Deserialize)]
pub struct FsStatRequest {
pub struct SysKillRequest {
pub pid: u32,
}

#[derive(Serialize)]
pub struct SysKillResponse {
pub success: bool,
}

/// Methods: `fs_read`/`fs_write`/`fs_rm`/`fs_mkdirp`/`fs_stat`
/// - fs_read: reads into a stream returned from the method,
/// - fs_write: writes from a stream passed to the method.
/// - fs_rm: recursively removes the file
/// - fs_mkdirp: recursively creates the directory
/// - fs_readdir: reads directory contents
/// - fs_stat: stats the given path
/// - fs_connect: connect to the given unix or named pipe socket, streaming
/// data in and out from the method's stream.
#[derive(Deserialize)]
pub struct FsSinglePathRequest {
pub path: String,
}

#[derive(Serialize)]
pub enum FsFileKind {
#[serde(rename = "dir")]
Directory,
#[serde(rename = "file")]
File,
#[serde(rename = "link")]
Link,
}

impl From<std::fs::FileType> for FsFileKind {
fn from(kind: std::fs::FileType) -> Self {
if kind.is_dir() {
Self::Directory
} else if kind.is_file() {
Self::File
} else if kind.is_symlink() {
Self::Link
} else {
unreachable!()
}
}
}

#[derive(Serialize, Default)]
pub struct FsStatResponse {
pub exists: bool,
pub size: Option<u64>,
#[serde(rename = "type")]
pub kind: Option<&'static str>,
pub kind: Option<FsFileKind>,
}

#[derive(Serialize)]
pub struct FsReadDirResponse {
pub contents: Vec<FsReadDirEntry>,
}

#[derive(Serialize)]
pub struct FsReadDirEntry {
pub name: String,
#[serde(rename = "type")]
pub kind: Option<FsFileKind>,
}

/// Method: `fs_reaname`. Renames a file.
#[derive(Deserialize)]
pub struct FsRenameRequest {
pub from_path: String,
pub to_path: String,
}

#[derive(Deserialize, Debug)]
Expand Down
12 changes: 12 additions & 0 deletions cli/src/util/machine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,18 @@ pub fn process_exists(pid: u32) -> bool {
sys.refresh_process(Pid::from_u32(pid))
}

pub fn kill_pid(pid: u32) -> bool {
let mut sys = System::new();
let pid = Pid::from_u32(pid);
sys.refresh_process(pid);

if let Some(p) = sys.process(pid) {
p.kill()
} else {
false
}
}

pub async fn wait_until_process_exits(pid: Pid, poll_ms: u64) {
let mut s = System::new();
let duration = Duration::from_millis(poll_ms);
Expand Down
Loading

0 comments on commit 18a2525

Please sign in to comment.