Skip to content

Commit

Permalink
[Agent] modify wasm http hook
Browse files Browse the repository at this point in the history
  • Loading branch information
qq906907952 authored and rvql committed Aug 31, 2023
1 parent 266a4e3 commit 571099c
Show file tree
Hide file tree
Showing 7 changed files with 99 additions and 175 deletions.
Binary file modified agent/resources/test/plugins/wasm_test.wasm.gz
Binary file not shown.
50 changes: 43 additions & 7 deletions agent/src/flow_generator/protocol_logs/http.rs
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ use super::{decode_new_rpc_trace_context_with_type, LogMessageType};

use crate::common::flow::L7PerfStats;
use crate::common::l7_protocol_log::L7ParseResult;
use crate::plugin::CustomInfo;
use crate::{
common::{
ebpf::EbpfType,
Expand Down Expand Up @@ -99,12 +100,51 @@ pub struct HttpInfo {
#[serde(rename = "response_code", skip_serializing_if = "Option::is_none")]
pub status_code: Option<i32>,
#[serde(rename = "response_status")]
status: L7ResponseStatus,
pub status: L7ResponseStatus,

#[serde(skip)]
attributes: Vec<KeyVal>,
}

impl HttpInfo {
pub fn merge_custom_to_http1(&mut self, custom: CustomInfo) {
// req rewrite
if !custom.req.domain.is_empty() {
self.host = custom.req.domain;
}

if !custom.req.req_type.is_empty() {
self.method = custom.req.req_type;
}

if !custom.req.resource.is_empty() {
self.path = custom.req.resource;
}

//req write
if custom.resp.code.is_some() {
self.status_code = custom.resp.code;
}

if custom.resp.status != self.status {
self.status = custom.resp.status;
}

//trace info rewrite
if custom.trace.trace_id.is_some() {
self.trace_id = custom.trace.trace_id.unwrap();
}
if custom.trace.span_id.is_some() {
self.span_id = custom.trace.span_id.unwrap();
}

// extend attribute
if !custom.attributes.is_empty() {
self.attributes.extend(custom.attributes);
}
}
}

impl L7ProtocolInfoInterface for HttpInfo {
fn session_id(&self) -> Option<u32> {
self.stream_id
Expand Down Expand Up @@ -999,12 +1039,8 @@ impl HttpLog {
PacketDirection::ClientToServer => vm.on_http_req(payload, param, info),
PacketDirection::ServerToClient => vm.on_http_resp(payload, param, info),
}
.map(|(trace, kv)| {
if let Some(trace) = trace {
trace.trace_id.map(|s| info.trace_id = s);
trace.span_id.map(|s| info.span_id = s);
}
info.attributes.extend(kv);
.map(|custom| {
info.merge_custom_to_http1(custom);
});
}
}
Expand Down
126 changes: 5 additions & 121 deletions agent/src/plugin/wasm/abi_import.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,15 @@
*/

use crate::{
flow_generator::protocol_logs::pb_adapter::{KeyVal, TraceInfo},
plugin::{wasm::IMPORT_FUNC_WASM_LOG, CustomInfo},
wasm_error,
};

use super::{
read_wasm_str, StoreDataType, VmParseCtx, VmResult, IMPORT_FUNC_HOST_READ_HTTP_RESULT,
IMPORT_FUNC_HOST_READ_L7_PROTOCOL_INFO, IMPORT_FUNC_HOST_READ_STR_RESULT,
IMPORT_FUNC_VM_READ_CTX_BASE, IMPORT_FUNC_VM_READ_HTTP_REQ, IMPORT_FUNC_VM_READ_HTTP_RESP,
IMPORT_FUNC_VM_READ_PAYLOAD, LOG_LEVEL_ERR, LOG_LEVEL_INFO, LOG_LEVEL_WARN, WASM_MODULE_NAME,
read_wasm_str, StoreDataType, VmParseCtx, VmResult, IMPORT_FUNC_HOST_READ_L7_PROTOCOL_INFO,
IMPORT_FUNC_HOST_READ_STR_RESULT, IMPORT_FUNC_VM_READ_CTX_BASE, IMPORT_FUNC_VM_READ_HTTP_REQ,
IMPORT_FUNC_VM_READ_HTTP_RESP, IMPORT_FUNC_VM_READ_PAYLOAD, LOG_LEVEL_ERR, LOG_LEVEL_INFO,
LOG_LEVEL_WARN, WASM_MODULE_NAME,
};

use log::{error, info, warn};
Expand Down Expand Up @@ -341,119 +340,11 @@ pub(super) fn host_read_l7_protocol_info(
.as_mut()
.unwrap()
.get_ctx_base_mut()
.set_result(VmResult::ParsePayloadResult(infos));
.set_result(VmResult::L7InfoResult(infos));

1
}

/*
import function, host read the serialized http result and deserizlize to CustomInfo.
correspond to go func signature:
//go:wasm-module deepflow
//export host_read_http_result
func hostReadHttpResult(b *byte, length int) bool
has trace: 1 byte
if has trace
trace_id, span_id, parent_span_id
(
str len: 2 bytes
str: $(str len) bytes
) x 3
(
key len: 2 bytes
key: $(key len) bytes
val len: 2 bytes
val: $(val len) bytes
) x n
*/
pub(super) fn host_read_http_result(
mut caller: Caller<'_, StoreDataType>,
b: i32,
len: i32,
) -> i32 {
if !check_memory(&mut caller, b, len, IMPORT_FUNC_HOST_READ_HTTP_RESULT) {
return 0;
}
let mut kv = vec![];
let mem = caller.get_export("memory").unwrap().into_memory().unwrap();
let mem = mem.data(caller.as_context());
let data = &mem[b as usize..(b + len) as usize];
let ins_name = caller.data().parse_ctx.as_ref().unwrap().get_ins_name();

let mut trace_info = None;
let mut off = 0;
let has_trace = data[off];
off += 1;

match has_trace {
0 => {}
1 => {
let mut i = TraceInfo::default();
if read_wasm_str(data, &mut off)
.and_then(|s| {
if !s.is_empty() {
i.trace_id = Some(s);
}
read_wasm_str(data, &mut off)
})
.and_then(|s| {
if !s.is_empty() {
i.span_id = Some(s);
}
read_wasm_str(data, &mut off)
})
.and_then(|s| {
if !s.is_empty() {
i.parent_span_id = Some(s);
}
Some(())
})
.is_none()
{
wasm_error!(
ins_name,
IMPORT_FUNC_HOST_READ_HTTP_RESULT,
"read trace info fail"
);
return 0;
}
trace_info = Some(i);
}
_ => {
wasm_error!(
ins_name,
IMPORT_FUNC_HOST_READ_HTTP_RESULT,
"read http result fail, has trace is unexpected value"
);
return 0;
}
}

loop {
let (Some(key), Some(val)) = (read_wasm_str(data, &mut off), read_wasm_str(data, &mut off))
else {
break;
};
kv.push(KeyVal { key, val });
}

caller
.data_mut()
.parse_ctx
.as_mut()
.unwrap()
.get_ctx_base_mut()
.set_result(VmResult::HTTPResult(trace_info, kv));
1
}

/*
import function, host read the serialized http result and deserizlize to CustomInfo.
Expand Down Expand Up @@ -534,13 +425,6 @@ pub(super) fn get_linker(e: Engine, store: &mut Store<StoreDataType>) -> Linker<
)
.unwrap();

link.func_wrap(
WASM_MODULE_NAME,
IMPORT_FUNC_HOST_READ_HTTP_RESULT,
host_read_http_result,
)
.unwrap();

link.func_wrap(
WASM_MODULE_NAME,
IMPORT_FUNC_HOST_READ_STR_RESULT,
Expand Down
20 changes: 8 additions & 12 deletions agent/src/plugin/wasm/host.rs
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,7 @@ use wasmtime_wasi::{WasiCtx, WasiCtxBuilder};
use crate::{
common::l7_protocol_log::ParseParam,
flow_generator::{
protocol_logs::{
pb_adapter::{KeyVal, TraceInfo},
HttpInfo,
},
protocol_logs::HttpInfo,
Error::{self, WasmVmError},
Result as WasmResult,
},
Expand Down Expand Up @@ -63,7 +60,6 @@ pub(super) const IMPORT_FUNC_VM_READ_PAYLOAD: &str = "vm_read_payload";
pub(super) const IMPORT_FUNC_VM_READ_HTTP_REQ: &str = "vm_read_http_req_info";
pub(super) const IMPORT_FUNC_VM_READ_HTTP_RESP: &str = "vm_read_http_resp_info";
pub(super) const IMPORT_FUNC_HOST_READ_L7_PROTOCOL_INFO: &str = "host_read_l7_protocol_info";
pub(super) const IMPORT_FUNC_HOST_READ_HTTP_RESULT: &str = "host_read_http_result";
pub(super) const IMPORT_FUNC_HOST_READ_STR_RESULT: &str = "host_read_str_result";

pub(super) const LOG_LEVEL_INFO: i32 = 0;
Expand Down Expand Up @@ -244,7 +240,7 @@ impl WasmVm {
.parse_ctx
.as_mut()
.unwrap()
.take_parse_payload_result()
.take_l7_info_result()
.map_or(Some(vec![]), |info| Some(info));
break;
}
Expand All @@ -259,7 +255,7 @@ impl WasmVm {
payload: &[u8],
param: &ParseParam,
info: &HttpInfo,
) -> Option<(Option<TraceInfo>, Vec<KeyVal>)> {
) -> Option<CustomInfo> {
if self.instance.len() == 0 {
return None;
}
Expand Down Expand Up @@ -326,8 +322,8 @@ impl WasmVm {
.parse_ctx
.as_mut()
.unwrap()
.take_http_result()
.map_or(None, |r| Some(r));
.take_l7_info_result()
.map_or(None, |mut r| r.pop());

break;
}
Expand All @@ -342,7 +338,7 @@ impl WasmVm {
payload: &[u8],
param: &ParseParam,
info: &HttpInfo,
) -> Option<(Option<TraceInfo>, Vec<KeyVal>)> {
) -> Option<CustomInfo> {
if self.instance.len() == 0 {
return None;
}
Expand Down Expand Up @@ -409,8 +405,8 @@ impl WasmVm {
.parse_ctx
.as_mut()
.unwrap()
.take_http_result()
.map_or(None, |r| Some(r));
.take_l7_info_result()
.map_or(None, |mut r| r.pop());

break;
}
Expand Down
9 changes: 4 additions & 5 deletions agent/src/plugin/wasm/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -114,11 +114,10 @@ mod vm;

use host::{
StoreDataType, EXPORT_FUNC_CHECK_PAYLOAD, EXPORT_FUNC_GET_HOOK_BITMAP, EXPORT_FUNC_ON_HTTP_REQ,
EXPORT_FUNC_ON_HTTP_RESP, EXPORT_FUNC_PARSE_PAYLOAD, IMPORT_FUNC_HOST_READ_HTTP_RESULT,
IMPORT_FUNC_HOST_READ_L7_PROTOCOL_INFO, IMPORT_FUNC_HOST_READ_STR_RESULT,
IMPORT_FUNC_VM_READ_CTX_BASE, IMPORT_FUNC_VM_READ_HTTP_REQ, IMPORT_FUNC_VM_READ_HTTP_RESP,
IMPORT_FUNC_VM_READ_PAYLOAD, IMPORT_FUNC_WASM_LOG, LOG_LEVEL_ERR, LOG_LEVEL_INFO,
LOG_LEVEL_WARN, WASM_MODULE_NAME,
EXPORT_FUNC_ON_HTTP_RESP, EXPORT_FUNC_PARSE_PAYLOAD, IMPORT_FUNC_HOST_READ_L7_PROTOCOL_INFO,
IMPORT_FUNC_HOST_READ_STR_RESULT, IMPORT_FUNC_VM_READ_CTX_BASE, IMPORT_FUNC_VM_READ_HTTP_REQ,
IMPORT_FUNC_VM_READ_HTTP_RESP, IMPORT_FUNC_VM_READ_PAYLOAD, IMPORT_FUNC_WASM_LOG,
LOG_LEVEL_ERR, LOG_LEVEL_INFO, LOG_LEVEL_WARN, WASM_MODULE_NAME,
};
use public::bytes::read_u16_be;
use vm::{VmCtxBase, VmHttpReqCtx, VmHttpRespCtx, VmParseCtx, VmResult};
Expand Down
Loading

0 comments on commit 571099c

Please sign in to comment.