diff --git a/agent/resources/test/flow_generator/http/h2c_ascii.result b/agent/resources/test/flow_generator/http/h2c_ascii.result index f78cafb16b6..6d50d00d5d4 100644 --- a/agent/resources/test/flow_generator/http/h2c_ascii.result +++ b/agent/resources/test/flow_generator/http/h2c_ascii.result @@ -1,2 +1,2 @@ -HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Grpc, is_tls: false, msg_type: Request, raw_data_type: RawProtocol, stream_id: Some(1), version: "2", trace_id: "", span_id: "", method: "POST", path: "/hipstershop.CartService/GetCart", host: "cartservice:7070", user_agent: Some("grpc-go/1.22.0"), referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: Some(43), resp_content_length: None, status_code: None, status: Ok, attributes: [] } is_http: true -HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Grpc, is_tls: false, msg_type: Response, raw_data_type: RawProtocol, stream_id: Some(1), version: "2", trace_id: "", span_id: "", method: "", path: "", host: "", user_agent: None, referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: None, resp_content_length: Some(21), status_code: Some(200), status: Ok, attributes: [] } is_http: true +HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Grpc, is_tls: false, msg_type: Request, raw_data_type: RawProtocol, stream_id: Some(1), version: "2", trace_id: "", span_id: "", method: "POST", path: "/hipstershop.CartService/GetCart", host: "cartservice:7070", user_agent: Some("grpc-go/1.22.0"), referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: Some(43), resp_content_length: None, status_code: None, status: Ok, custom_endpoint: None, custom_result: None, custom_exception: None, attributes: [] } is_http: true +HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Grpc, is_tls: false, msg_type: Response, raw_data_type: RawProtocol, stream_id: Some(1), version: "2", trace_id: "", span_id: "", method: "", path: "", host: "", user_agent: None, referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: None, resp_content_length: Some(21), status_code: Some(200), status: Ok, custom_endpoint: None, custom_result: None, custom_exception: None, attributes: [] } is_http: true diff --git a/agent/resources/test/flow_generator/http/httpv1.result b/agent/resources/test/flow_generator/http/httpv1.result index 9b86c39479e..292529c522a 100644 --- a/agent/resources/test/flow_generator/http/httpv1.result +++ b/agent/resources/test/flow_generator/http/httpv1.result @@ -1,2 +1,2 @@ -HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http1, is_tls: false, msg_type: Request, raw_data_type: RawProtocol, stream_id: None, version: "1.1", trace_id: "", span_id: "", method: "POST", path: "/query?1590632942", host: "rq.cct.cloud.duba.net", user_agent: None, referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: Some(85), resp_content_length: None, status_code: None, status: Ok, attributes: [] } is_http: true -HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http1, is_tls: false, msg_type: Response, raw_data_type: RawProtocol, stream_id: None, version: "1.1", trace_id: "", span_id: "", method: "", path: "", host: "", user_agent: None, referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: None, resp_content_length: Some(54), status_code: Some(200), status: Ok, attributes: [] } is_http: true +HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http1, is_tls: false, msg_type: Request, raw_data_type: RawProtocol, stream_id: None, version: "1.1", trace_id: "", span_id: "", method: "POST", path: "/query?1590632942", host: "rq.cct.cloud.duba.net", user_agent: None, referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: Some(85), resp_content_length: None, status_code: None, status: Ok, custom_endpoint: None, custom_result: None, custom_exception: None, attributes: [] } is_http: true +HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http1, is_tls: false, msg_type: Response, raw_data_type: RawProtocol, stream_id: None, version: "1.1", trace_id: "", span_id: "", method: "", path: "", host: "", user_agent: None, referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: None, resp_content_length: Some(54), status_code: Some(200), status: Ok, custom_endpoint: None, custom_result: None, custom_exception: None, attributes: [] } is_http: true diff --git a/agent/resources/test/flow_generator/http/httpv2-stream-id.result b/agent/resources/test/flow_generator/http/httpv2-stream-id.result index 8c90113f76d..99bcf62c0d6 100644 --- a/agent/resources/test/flow_generator/http/httpv2-stream-id.result +++ b/agent/resources/test/flow_generator/http/httpv2-stream-id.result @@ -1 +1 @@ -HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http2, is_tls: false, msg_type: Request, raw_data_type: RawProtocol, stream_id: Some(1392369), version: "2", trace_id: "", span_id: "", method: "POST", path: "", host: "", user_agent: None, referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: Some(0), resp_content_length: None, status_code: None, status: Ok, attributes: [] } is_http: true +HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http2, is_tls: false, msg_type: Request, raw_data_type: RawProtocol, stream_id: Some(1392369), version: "2", trace_id: "", span_id: "", method: "POST", path: "", host: "", user_agent: None, referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: Some(0), resp_content_length: None, status_code: None, status: Ok, custom_endpoint: None, custom_result: None, custom_exception: None, attributes: [] } is_http: true diff --git a/agent/resources/test/flow_generator/http/istio-tcp-frag.result b/agent/resources/test/flow_generator/http/istio-tcp-frag.result index 34fda1fb3cf..49298ea9315 100644 --- a/agent/resources/test/flow_generator/http/istio-tcp-frag.result +++ b/agent/resources/test/flow_generator/http/istio-tcp-frag.result @@ -1,2 +1,2 @@ -HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http1, is_tls: false, msg_type: Request, raw_data_type: RawProtocol, stream_id: None, version: "1.1", trace_id: "", span_id: "", method: "GET", path: "/productpage", host: "productpage:9080", user_agent: Some("curl/7.81.0-DEV"), referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: None, resp_content_length: None, status_code: None, status: Ok, attributes: [] } is_http: true -HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http1, is_tls: false, msg_type: Response, raw_data_type: RawProtocol, stream_id: None, version: "1.0", trace_id: "", span_id: "", method: "", path: "", host: "", user_agent: None, referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: None, resp_content_length: None, status_code: Some(200), status: Ok, attributes: [] } is_http: true +HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http1, is_tls: false, msg_type: Request, raw_data_type: RawProtocol, stream_id: None, version: "1.1", trace_id: "", span_id: "", method: "GET", path: "/productpage", host: "productpage:9080", user_agent: Some("curl/7.81.0-DEV"), referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: None, resp_content_length: None, status_code: None, status: Ok, custom_endpoint: None, custom_result: None, custom_exception: None, attributes: [] } is_http: true +HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http1, is_tls: false, msg_type: Response, raw_data_type: RawProtocol, stream_id: None, version: "1.0", trace_id: "", span_id: "", method: "", path: "", host: "", user_agent: None, referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: None, resp_content_length: None, status_code: Some(200), status: Ok, custom_endpoint: None, custom_result: None, custom_exception: None, attributes: [] } is_http: true diff --git a/agent/resources/test/flow_generator/http/sw8.result b/agent/resources/test/flow_generator/http/sw8.result index 9d31a5af1ec..71e39678e9e 100644 --- a/agent/resources/test/flow_generator/http/sw8.result +++ b/agent/resources/test/flow_generator/http/sw8.result @@ -1,9 +1,9 @@ -HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http1, is_tls: false, msg_type: Request, raw_data_type: RawProtocol, stream_id: None, version: "1.1", trace_id: "3912196de0cf41f4bab8a8a8108fc3a8.63.16294441329780027", span_id: "3912196de0cf41f4bab8a8a8108fc3a8.63.16294441329780026-4", method: "POST", path: "/createOrder", host: "10.100.18.175:20880", user_agent: Some("Apache-HttpClient/4.5.10 (Java/1.8.0_212)"), referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: Some(351), resp_content_length: None, status_code: None, status: Ok, attributes: [] } is_http: true -HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http1, is_tls: false, msg_type: Response, raw_data_type: RawProtocol, stream_id: None, version: "1.1", trace_id: "", span_id: "", method: "", path: "", host: "", user_agent: None, referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: None, resp_content_length: None, status_code: Some(200), status: Ok, attributes: [] } is_http: true -HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http1, is_tls: false, msg_type: Other, raw_data_type: RawProtocol, stream_id: None, version: "", trace_id: "", span_id: "", method: "", path: "", host: "", user_agent: None, referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: None, resp_content_length: None, status_code: None, status: Ok, attributes: [] } is_http: false -HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http1, is_tls: false, msg_type: Request, raw_data_type: RawProtocol, stream_id: None, version: "1.1", trace_id: "3912196de0cf41f4bab8a8a8108fc3a8.65.16294441341700021", span_id: "3912196de0cf41f4bab8a8a8108fc3a8.65.16294441341700020-3", method: "POST", path: "/createOrder", host: "10.100.18.175:20880", user_agent: Some("Apache-HttpClient/4.5.10 (Java/1.8.0_212)"), referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: Some(247), resp_content_length: None, status_code: None, status: Ok, attributes: [] } is_http: true -HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http1, is_tls: false, msg_type: Response, raw_data_type: RawProtocol, stream_id: None, version: "1.1", trace_id: "", span_id: "", method: "", path: "", host: "", user_agent: None, referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: None, resp_content_length: None, status_code: Some(200), status: Ok, attributes: [] } is_http: true -HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http1, is_tls: false, msg_type: Other, raw_data_type: RawProtocol, stream_id: None, version: "", trace_id: "", span_id: "", method: "", path: "", host: "", user_agent: None, referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: None, resp_content_length: None, status_code: None, status: Ok, attributes: [] } is_http: false -HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http1, is_tls: false, msg_type: Request, raw_data_type: RawProtocol, stream_id: None, version: "1.1", trace_id: "3912196de0cf41f4bab8a8a8108fc3a8.56.16294441349520027", span_id: "3912196de0cf41f4bab8a8a8108fc3a8.56.16294441349520026-4", method: "POST", path: "/createOrder", host: "10.100.18.175:20880", user_agent: Some("Apache-HttpClient/4.5.10 (Java/1.8.0_212)"), referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: Some(350), resp_content_length: None, status_code: None, status: Ok, attributes: [] } is_http: true -HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http1, is_tls: false, msg_type: Response, raw_data_type: RawProtocol, stream_id: None, version: "1.1", trace_id: "", span_id: "", method: "", path: "", host: "", user_agent: None, referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: None, resp_content_length: None, status_code: Some(200), status: Ok, attributes: [] } is_http: true -HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http1, is_tls: false, msg_type: Other, raw_data_type: RawProtocol, stream_id: None, version: "", trace_id: "", span_id: "", method: "", path: "", host: "", user_agent: None, referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: None, resp_content_length: None, status_code: None, status: Ok, attributes: [] } is_http: false +HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http1, is_tls: false, msg_type: Request, raw_data_type: RawProtocol, stream_id: None, version: "1.1", trace_id: "3912196de0cf41f4bab8a8a8108fc3a8.63.16294441329780027", span_id: "3912196de0cf41f4bab8a8a8108fc3a8.63.16294441329780026-4", method: "POST", path: "/createOrder", host: "10.100.18.175:20880", user_agent: Some("Apache-HttpClient/4.5.10 (Java/1.8.0_212)"), referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: Some(351), resp_content_length: None, status_code: None, status: Ok, custom_endpoint: None, custom_result: None, custom_exception: None, attributes: [] } is_http: true +HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http1, is_tls: false, msg_type: Response, raw_data_type: RawProtocol, stream_id: None, version: "1.1", trace_id: "", span_id: "", method: "", path: "", host: "", user_agent: None, referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: None, resp_content_length: None, status_code: Some(200), status: Ok, custom_endpoint: None, custom_result: None, custom_exception: None, attributes: [] } is_http: true +HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http1, is_tls: false, msg_type: Other, raw_data_type: RawProtocol, stream_id: None, version: "", trace_id: "", span_id: "", method: "", path: "", host: "", user_agent: None, referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: None, resp_content_length: None, status_code: None, status: Ok, custom_endpoint: None, custom_result: None, custom_exception: None, attributes: [] } is_http: false +HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http1, is_tls: false, msg_type: Request, raw_data_type: RawProtocol, stream_id: None, version: "1.1", trace_id: "3912196de0cf41f4bab8a8a8108fc3a8.65.16294441341700021", span_id: "3912196de0cf41f4bab8a8a8108fc3a8.65.16294441341700020-3", method: "POST", path: "/createOrder", host: "10.100.18.175:20880", user_agent: Some("Apache-HttpClient/4.5.10 (Java/1.8.0_212)"), referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: Some(247), resp_content_length: None, status_code: None, status: Ok, custom_endpoint: None, custom_result: None, custom_exception: None, attributes: [] } is_http: true +HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http1, is_tls: false, msg_type: Response, raw_data_type: RawProtocol, stream_id: None, version: "1.1", trace_id: "", span_id: "", method: "", path: "", host: "", user_agent: None, referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: None, resp_content_length: None, status_code: Some(200), status: Ok, custom_endpoint: None, custom_result: None, custom_exception: None, attributes: [] } is_http: true +HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http1, is_tls: false, msg_type: Other, raw_data_type: RawProtocol, stream_id: None, version: "", trace_id: "", span_id: "", method: "", path: "", host: "", user_agent: None, referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: None, resp_content_length: None, status_code: None, status: Ok, custom_endpoint: None, custom_result: None, custom_exception: None, attributes: [] } is_http: false +HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http1, is_tls: false, msg_type: Request, raw_data_type: RawProtocol, stream_id: None, version: "1.1", trace_id: "3912196de0cf41f4bab8a8a8108fc3a8.56.16294441349520027", span_id: "3912196de0cf41f4bab8a8a8108fc3a8.56.16294441349520026-4", method: "POST", path: "/createOrder", host: "10.100.18.175:20880", user_agent: Some("Apache-HttpClient/4.5.10 (Java/1.8.0_212)"), referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: Some(350), resp_content_length: None, status_code: None, status: Ok, custom_endpoint: None, custom_result: None, custom_exception: None, attributes: [] } is_http: true +HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http1, is_tls: false, msg_type: Response, raw_data_type: RawProtocol, stream_id: None, version: "1.1", trace_id: "", span_id: "", method: "", path: "", host: "", user_agent: None, referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: None, resp_content_length: None, status_code: Some(200), status: Ok, custom_endpoint: None, custom_result: None, custom_exception: None, attributes: [] } is_http: true +HttpInfo { is_req_end: false, is_resp_end: false, rrt: 0, proto: Http1, is_tls: false, msg_type: Other, raw_data_type: RawProtocol, stream_id: None, version: "", trace_id: "", span_id: "", method: "", path: "", host: "", user_agent: None, referer: None, client_ip: "", x_request_id_0: "", x_request_id_1: "", req_content_length: None, resp_content_length: None, status_code: None, status: Ok, custom_endpoint: None, custom_result: None, custom_exception: None, attributes: [] } is_http: false diff --git a/agent/resources/test/plugins/wasm_test.wasm.gz b/agent/resources/test/plugins/wasm_test.wasm.gz index cb09f464281..c8b288e1d9c 100644 Binary files a/agent/resources/test/plugins/wasm_test.wasm.gz and b/agent/resources/test/plugins/wasm_test.wasm.gz differ diff --git a/agent/src/flow_generator/protocol_logs/http.rs b/agent/src/flow_generator/protocol_logs/http.rs index 9e3b919d3ee..a19e7d67e9f 100644 --- a/agent/src/flow_generator/protocol_logs/http.rs +++ b/agent/src/flow_generator/protocol_logs/http.rs @@ -28,6 +28,7 @@ use super::{decode_new_rpc_trace_context_with_type, LogMessageType}; use crate::common::flow::L7PerfStats; use crate::common::l7_protocol_log::L7ParseResult; +use crate::plugin::CustomInfo; use crate::{ common::{ ebpf::EbpfType, @@ -99,12 +100,68 @@ pub struct HttpInfo { #[serde(rename = "response_code", skip_serializing_if = "Option::is_none")] pub status_code: Option, #[serde(rename = "response_status")] - status: L7ResponseStatus, + pub status: L7ResponseStatus, + + // set by wasm plugin + custom_endpoint: Option, + custom_result: Option, + custom_exception: Option, #[serde(skip)] attributes: Vec, } +impl HttpInfo { + pub fn merge_custom_to_http1(&mut self, custom: CustomInfo) { + // req rewrite + if !custom.req.domain.is_empty() { + self.host = custom.req.domain; + } + + if !custom.req.req_type.is_empty() { + self.method = custom.req.req_type; + } + + if !custom.req.resource.is_empty() { + self.path = custom.req.resource; + } + + if !custom.req.endpoint.is_empty() { + self.custom_endpoint = Some(custom.req.endpoint) + } + + //req write + if custom.resp.code.is_some() { + self.status_code = custom.resp.code; + } + + if custom.resp.status != self.status { + self.status = custom.resp.status; + } + + if !custom.resp.result.is_empty() { + self.custom_result = Some(custom.resp.result) + } + + if !custom.resp.exception.is_empty() { + self.custom_exception = Some(custom.resp.exception) + } + + //trace info rewrite + if custom.trace.trace_id.is_some() { + self.trace_id = custom.trace.trace_id.unwrap(); + } + if custom.trace.span_id.is_some() { + self.span_id = custom.trace.span_id.unwrap(); + } + + // extend attribute + if !custom.attributes.is_empty() { + self.attributes.extend(custom.attributes); + } + } +} + impl L7ProtocolInfoInterface for HttpInfo { fn session_id(&self) -> Option { self.stream_id @@ -309,7 +366,12 @@ impl From for L7ProtocolSendLog { f.path, ) } else { - (f.method, f.path.clone(), f.host, String::new()) + ( + f.method, + f.path.clone(), + f.host, + f.custom_endpoint.unwrap_or_default(), + ) }; L7ProtocolSendLog { @@ -325,7 +387,8 @@ impl From for L7ProtocolSendLog { resp: L7Response { status: f.status, code: f.status_code, - ..Default::default() + exception: f.custom_exception.unwrap_or_default(), + result: f.custom_result.unwrap_or_default(), }, trace_info: Some(TraceInfo { trace_id: Some(f.trace_id), @@ -999,12 +1062,8 @@ impl HttpLog { PacketDirection::ClientToServer => vm.on_http_req(payload, param, info), PacketDirection::ServerToClient => vm.on_http_resp(payload, param, info), } - .map(|(trace, kv)| { - if let Some(trace) = trace { - trace.trace_id.map(|s| info.trace_id = s); - trace.span_id.map(|s| info.span_id = s); - } - info.attributes.extend(kv); + .map(|custom| { + info.merge_custom_to_http1(custom); }); } } diff --git a/agent/src/plugin/wasm/abi_import.rs b/agent/src/plugin/wasm/abi_import.rs index de6c7c5456e..5bc6d2597d2 100644 --- a/agent/src/plugin/wasm/abi_import.rs +++ b/agent/src/plugin/wasm/abi_import.rs @@ -15,16 +15,15 @@ */ use crate::{ - flow_generator::protocol_logs::pb_adapter::{KeyVal, TraceInfo}, plugin::{wasm::IMPORT_FUNC_WASM_LOG, CustomInfo}, wasm_error, }; use super::{ - read_wasm_str, StoreDataType, VmParseCtx, VmResult, IMPORT_FUNC_HOST_READ_HTTP_RESULT, - IMPORT_FUNC_HOST_READ_L7_PROTOCOL_INFO, IMPORT_FUNC_HOST_READ_STR_RESULT, - IMPORT_FUNC_VM_READ_CTX_BASE, IMPORT_FUNC_VM_READ_HTTP_REQ, IMPORT_FUNC_VM_READ_HTTP_RESP, - IMPORT_FUNC_VM_READ_PAYLOAD, LOG_LEVEL_ERR, LOG_LEVEL_INFO, LOG_LEVEL_WARN, WASM_MODULE_NAME, + read_wasm_str, StoreDataType, VmParseCtx, VmResult, IMPORT_FUNC_HOST_READ_L7_PROTOCOL_INFO, + IMPORT_FUNC_HOST_READ_STR_RESULT, IMPORT_FUNC_VM_READ_CTX_BASE, IMPORT_FUNC_VM_READ_HTTP_REQ, + IMPORT_FUNC_VM_READ_HTTP_RESP, IMPORT_FUNC_VM_READ_PAYLOAD, LOG_LEVEL_ERR, LOG_LEVEL_INFO, + LOG_LEVEL_WARN, WASM_MODULE_NAME, }; use log::{error, info, warn}; @@ -341,118 +340,11 @@ pub(super) fn host_read_l7_protocol_info( .as_mut() .unwrap() .get_ctx_base_mut() - .set_result(VmResult::ParsePayloadResult(infos)); + .set_result(VmResult::L7InfoResult(infos)); 1 } -/* - import function, host read the serialized http result and deserizlize to CustomInfo. - - correspond to go func signature: - - //go:wasm-module deepflow - //export host_read_http_result - func hostReadHttpResult(b *byte, length int) bool - - -has trace: 1 byte -if has trace - trace_id, span_id, parent_span_id - ( - str len: 2 bytes - str: $(str len) bytes - ) x 3 - -( - key len: 2 bytes - key: $(key len) bytes - - val len: 2 bytes - val: $(val len) bytes -) x n - -*/ -pub(super) fn host_read_http_result( - mut caller: Caller<'_, StoreDataType>, - b: i32, - len: i32, -) -> i32 { - if !check_memory(&mut caller, b, len, IMPORT_FUNC_HOST_READ_HTTP_RESULT) { - return 0; - } - let mut kv = vec![]; - let mem = caller.get_export("memory").unwrap().into_memory().unwrap(); - let mem = mem.data(caller.as_context()); - let data = &mem[b as usize..(b + len) as usize]; - let ins_name = caller.data().parse_ctx.as_ref().unwrap().get_ins_name(); - - let mut trace_info = None; - let mut off = 0; - let has_trace = data[off]; - off += 1; - - match has_trace { - 0 => {} - 1 => { - let mut i = TraceInfo::default(); - if read_wasm_str(data, &mut off) - .and_then(|s| { - if !s.is_empty() { - i.trace_id = Some(s); - } - read_wasm_str(data, &mut off) - }) - .and_then(|s| { - if !s.is_empty() { - i.span_id = Some(s); - } - read_wasm_str(data, &mut off) - }) - .and_then(|s| { - if !s.is_empty() { - i.parent_span_id = Some(s); - } - Some(()) - }) - .is_none() - { - wasm_error!( - ins_name, - IMPORT_FUNC_HOST_READ_HTTP_RESULT, - "read trace info fail" - ); - return 0; - } - trace_info = Some(i); - } - _ => { - wasm_error!( - ins_name, - IMPORT_FUNC_HOST_READ_HTTP_RESULT, - "read http result fail, has trace is unexpected value" - ); - return 0; - } - } - - loop { - let (Some(key), Some(val)) = (read_wasm_str(data, &mut off), read_wasm_str(data, &mut off)) else { - break; - }; - kv.push(KeyVal { key, val }); - } - - caller - .data_mut() - .parse_ctx - .as_mut() - .unwrap() - .get_ctx_base_mut() - .set_result(VmResult::HTTPResult(trace_info, kv)); - 1 -} - /* import function, host read the serialized http result and deserizlize to CustomInfo. @@ -533,13 +425,6 @@ pub(super) fn get_linker(e: Engine, store: &mut Store) -> Linker< ) .unwrap(); - link.func_wrap( - WASM_MODULE_NAME, - IMPORT_FUNC_HOST_READ_HTTP_RESULT, - host_read_http_result, - ) - .unwrap(); - link.func_wrap( WASM_MODULE_NAME, IMPORT_FUNC_HOST_READ_STR_RESULT, diff --git a/agent/src/plugin/wasm/host.rs b/agent/src/plugin/wasm/host.rs index 05efa576776..f8dceeeb032 100644 --- a/agent/src/plugin/wasm/host.rs +++ b/agent/src/plugin/wasm/host.rs @@ -30,10 +30,7 @@ use wasmtime_wasi::{WasiCtx, WasiCtxBuilder}; use crate::{ common::l7_protocol_log::ParseParam, flow_generator::{ - protocol_logs::{ - pb_adapter::{KeyVal, TraceInfo}, - HttpInfo, - }, + protocol_logs::HttpInfo, Error::{self, WasmVmError}, Result as WasmResult, }, @@ -63,7 +60,6 @@ pub(super) const IMPORT_FUNC_VM_READ_PAYLOAD: &str = "vm_read_payload"; pub(super) const IMPORT_FUNC_VM_READ_HTTP_REQ: &str = "vm_read_http_req_info"; pub(super) const IMPORT_FUNC_VM_READ_HTTP_RESP: &str = "vm_read_http_resp_info"; pub(super) const IMPORT_FUNC_HOST_READ_L7_PROTOCOL_INFO: &str = "host_read_l7_protocol_info"; -pub(super) const IMPORT_FUNC_HOST_READ_HTTP_RESULT: &str = "host_read_http_result"; pub(super) const IMPORT_FUNC_HOST_READ_STR_RESULT: &str = "host_read_str_result"; pub(super) const LOG_LEVEL_INFO: i32 = 0; @@ -244,7 +240,7 @@ impl WasmVm { .parse_ctx .as_mut() .unwrap() - .take_parse_payload_result() + .take_l7_info_result() .map_or(Some(vec![]), |info| Some(info)); break; } @@ -259,7 +255,7 @@ impl WasmVm { payload: &[u8], param: &ParseParam, info: &HttpInfo, - ) -> Option<(Option, Vec)> { + ) -> Option { if self.instance.len() == 0 { return None; } @@ -326,8 +322,8 @@ impl WasmVm { .parse_ctx .as_mut() .unwrap() - .take_http_result() - .map_or(None, |r| Some(r)); + .take_l7_info_result() + .map_or(None, |mut r| r.pop()); break; } @@ -342,7 +338,7 @@ impl WasmVm { payload: &[u8], param: &ParseParam, info: &HttpInfo, - ) -> Option<(Option, Vec)> { + ) -> Option { if self.instance.len() == 0 { return None; } @@ -409,8 +405,8 @@ impl WasmVm { .parse_ctx .as_mut() .unwrap() - .take_http_result() - .map_or(None, |r| Some(r)); + .take_l7_info_result() + .map_or(None, |mut r| r.pop()); break; } diff --git a/agent/src/plugin/wasm/mod.rs b/agent/src/plugin/wasm/mod.rs index 117e0870287..f1f87f02712 100644 --- a/agent/src/plugin/wasm/mod.rs +++ b/agent/src/plugin/wasm/mod.rs @@ -114,11 +114,10 @@ mod vm; use host::{ StoreDataType, EXPORT_FUNC_CHECK_PAYLOAD, EXPORT_FUNC_GET_HOOK_BITMAP, EXPORT_FUNC_ON_HTTP_REQ, - EXPORT_FUNC_ON_HTTP_RESP, EXPORT_FUNC_PARSE_PAYLOAD, IMPORT_FUNC_HOST_READ_HTTP_RESULT, - IMPORT_FUNC_HOST_READ_L7_PROTOCOL_INFO, IMPORT_FUNC_HOST_READ_STR_RESULT, - IMPORT_FUNC_VM_READ_CTX_BASE, IMPORT_FUNC_VM_READ_HTTP_REQ, IMPORT_FUNC_VM_READ_HTTP_RESP, - IMPORT_FUNC_VM_READ_PAYLOAD, IMPORT_FUNC_WASM_LOG, LOG_LEVEL_ERR, LOG_LEVEL_INFO, - LOG_LEVEL_WARN, WASM_MODULE_NAME, + EXPORT_FUNC_ON_HTTP_RESP, EXPORT_FUNC_PARSE_PAYLOAD, IMPORT_FUNC_HOST_READ_L7_PROTOCOL_INFO, + IMPORT_FUNC_HOST_READ_STR_RESULT, IMPORT_FUNC_VM_READ_CTX_BASE, IMPORT_FUNC_VM_READ_HTTP_REQ, + IMPORT_FUNC_VM_READ_HTTP_RESP, IMPORT_FUNC_VM_READ_PAYLOAD, IMPORT_FUNC_WASM_LOG, + LOG_LEVEL_ERR, LOG_LEVEL_INFO, LOG_LEVEL_WARN, WASM_MODULE_NAME, }; use public::bytes::read_u16_be; use vm::{VmCtxBase, VmHttpReqCtx, VmHttpRespCtx, VmParseCtx, VmResult}; diff --git a/agent/src/plugin/wasm/test.rs b/agent/src/plugin/wasm/test.rs index 66e18f57433..660917f4ba4 100644 --- a/agent/src/plugin/wasm/test.rs +++ b/agent/src/plugin/wasm/test.rs @@ -185,6 +185,11 @@ fn test_wasm_http_req() { "bbb" ); + assert_eq!(i.req.domain.as_str(), "rewrite domain"); + assert_eq!(i.req.req_type.as_str(), "rewrite req type"); + assert_eq!(i.req.resource.as_str(), "rewrite resource"); + assert_eq!(i.req.endpoint.as_str(), "rewrite endpoint"); + let attr = i.ext_info.unwrap().attributes.unwrap(); assert_eq!(attr.len(), kv.len()); @@ -239,6 +244,11 @@ fn test_wasm_http_resp() { "" ); + assert_eq!(i.resp.code.unwrap(), 599); + assert_eq!(i.resp.status, L7ResponseStatus::ServerError); + assert_eq!(i.resp.exception.as_str(), "rewrite exception"); + assert_eq!(i.resp.result, "rewrite result"); + let attr = i.ext_info.unwrap().attributes.unwrap(); assert_eq!(attr.len(), kv.len()); for i in attr { @@ -446,6 +456,7 @@ message Resp { // go wasm code, build cocmmand: // tinygo build -o wasm.wasm -target wasi -wasm-abi=generic -panic trap -scheduler=none -no-debug ./main.go + /* package main @@ -534,7 +545,7 @@ func checkEq(a, b interface{}) { } } -func (p parser) OnHttpReq(ctx *sdk.HttpReqCtx) sdk.HttpAction { +func (p parser) OnHttpReq(ctx *sdk.HttpReqCtx) sdk.Action { sdk.Warn("================ enter http req ==================") baseCtx := &ctx.BaseCtx payload, err := baseCtx.GetPayload() @@ -591,10 +602,15 @@ func (p parser) OnHttpReq(ctx *sdk.HttpReqCtx) sdk.HttpAction { SpanID: "bbb", ParentSpanID: "ccc", } - return sdk.HttpActionAbortWithResult(trace, attr) + return sdk.HttpReqActionAbortWithResult(&sdk.Request{ + ReqType: "rewrite req type", + Domain: "rewrite domain", + Resource: "rewrite resource", + Endpoint: "rewrite endpoint", + }, trace, attr) } -func (p parser) OnHttpResp(ctx *sdk.HttpRespCtx) sdk.HttpAction { +func (p parser) OnHttpResp(ctx *sdk.HttpRespCtx) sdk.Action { sdk.Warn("================ enter http resp ==================") baseCtx := &ctx.BaseCtx payload, err := baseCtx.GetPayload() @@ -609,6 +625,7 @@ func (p parser) OnHttpResp(ctx *sdk.HttpRespCtx) sdk.HttpAction { checkEq(0, int(baseCtx.L7)) checkEq(uint16(200), ctx.Code) + checkEq(sdk.RespStatusOk, ctx.Status) r := bufio.NewReader(bytes.NewReader(payload)) req, err := http.ReadResponse(r, nil) @@ -626,7 +643,14 @@ func (p parser) OnHttpResp(ctx *sdk.HttpRespCtx) sdk.HttpAction { userID := fastjson.GetInt(body, "data", "user_id") userName := fastjson.GetString(body, "data", "name") - return sdk.HttpActionAbortWithResult(nil, []sdk.KeyVal{ + var code int32 = 599 + status := sdk.RespStatusServerErr + return sdk.HttpRespActionAbortWithResult(&sdk.Response{ + Status: &status, + Code: &code, + Result: "rewrite result", + Exception: "rewrite exception", + }, nil, []sdk.KeyVal{ { Key: "user_id", Val: strconv.Itoa(userID), @@ -664,7 +688,7 @@ func (p parser) OnCheckPayload(baseCtx *sdk.ParseCtx) (uint8, string) { return 1, "test" } -func (p parser) OnParsePayload(baseCtx *sdk.ParseCtx) sdk.ParseAction { +func (p parser) OnParsePayload(baseCtx *sdk.ParseCtx) sdk.Action { sdk.Warn("================ parse payload ==================") payload, err := baseCtx.GetPayload() if err != nil { @@ -761,6 +785,7 @@ func (p parser) OnParsePayload(baseCtx *sdk.ParseCtx) sdk.ParseAction { respLen := 9999 requestID := uint32(666) code := int32(999) + status := sdk.RespStatusOk return sdk.ParseActionAbortWithL7Info([]*sdk.L7ProtocolInfo{ { ReqLen: &reqLen, @@ -768,7 +793,7 @@ func (p parser) OnParsePayload(baseCtx *sdk.ParseCtx) sdk.ParseAction { RequestID: &requestID, Req: nil, Resp: &sdk.Response{ - Status: sdk.RespStatusOk, + Status: &status, Code: &code, Result: "result", Exception: "exception", @@ -792,7 +817,7 @@ func (p parser) OnParsePayload(baseCtx *sdk.ParseCtx) sdk.ParseAction { RequestID: &requestID, Req: nil, Resp: &sdk.Response{ - Status: sdk.RespStatusOk, + Status: &status, Code: &code, Result: "result", Exception: "exception", @@ -819,6 +844,5 @@ func (p parser) OnParsePayload(baseCtx *sdk.ParseCtx) sdk.ParseAction { func main() { sdk.Warn("wasm register parser") sdk.SetParser(parser{}) - } */ diff --git a/agent/src/plugin/wasm/vm.rs b/agent/src/plugin/wasm/vm.rs index bb198bdc534..9446d4004e8 100644 --- a/agent/src/plugin/wasm/vm.rs +++ b/agent/src/plugin/wasm/vm.rs @@ -20,8 +20,7 @@ use std::net::IpAddr; use crate::common::ebpf::EbpfType; use crate::common::flow::PacketDirection; use crate::common::l7_protocol_log::ParseParam; -use crate::flow_generator::protocol_logs::pb_adapter::{KeyVal, TraceInfo}; -use crate::flow_generator::protocol_logs::HttpInfo; +use crate::flow_generator::protocol_logs::{HttpInfo, L7ResponseStatus}; use crate::flow_generator::{Error, Result}; use crate::plugin::CustomInfo; use crate::wasm_error; @@ -34,10 +33,8 @@ use public::enums::IpProtocol; #[derive(Debug)] pub(super) enum VmResult { // result of parse_payload - ParsePayloadResult(Vec), + L7InfoResult(Vec), StringResult(String), - // result of on_http_req and on_http_resp - HTTPResult(Option, Vec), } // vm parse ctx @@ -76,9 +73,9 @@ impl VmParseCtx { self.get_ctx_base_mut().result.take() } - pub(super) fn take_parse_payload_result(&mut self) -> Option> { + pub(super) fn take_l7_info_result(&mut self) -> Option> { self.take_result().map_or(None, |r| match r { - VmResult::ParsePayloadResult(info) => Some(info), + VmResult::L7InfoResult(info) => Some(info), _ => { wasm_error!( self.get_ins_name(), @@ -89,16 +86,6 @@ impl VmParseCtx { }) } - pub(super) fn take_http_result(&mut self) -> Option<(Option, Vec)> { - self.take_result().map_or(None, |r| match r { - VmResult::HTTPResult(trace, kv) => Some((trace, kv)), - _ => { - wasm_error!(self.get_ins_name(), "http result with unexpect type",); - None - } - }) - } - pub(super) fn take_str_result(&mut self) -> Option { self.take_result().map_or(None, |r| match r { VmResult::StringResult(s) => Some(s), @@ -400,19 +387,20 @@ impl From<(&ParseParam<'_>, &HttpInfo, &[u8])> for VmHttpReqCtx { pub struct VmHttpRespCtx { pub base_ctx: VmCtxBase, pub code: u16, + pub status: L7ResponseStatus, } impl VmHttpRespCtx { /* - code: 2bytes + code: 2 bytes + status: 1 bytes */ + const BUF_SIZE: usize = 3; pub(super) fn serialize_to_bytes(&self, buf: &mut [u8]) -> Result { - let need_size = 2; - - if buf.len() < need_size { + if buf.len() < Self::BUF_SIZE { return Err(Error::WasmSerializeFail(format!( "serialize http resp ctx fail, need at lease {} bytes but buf only {} bytes", - need_size, + Self::BUF_SIZE, buf.len() ))); } @@ -420,6 +408,8 @@ impl VmHttpRespCtx { let mut off = 0; write_u16_be(buf, self.code); off += 2; + buf[off] = self.status as u8; + off += 1; Ok(off) } } @@ -430,6 +420,7 @@ impl From<(&ParseParam<'_>, &HttpInfo, &[u8])> for VmHttpRespCtx { Self { base_ctx: VmCtxBase::from((param, 0, payload)), code: info.status_code.map_or(0, |c| c as u16), + status: info.status, } } }