Skip to content

Commit

Permalink
Merge pull request #490 from ESWZY/traffic-mgr/map-io-and-weighted-po…
Browse files Browse the repository at this point in the history
…s-selection

Provide map read and write interface on the Go program side and weighted back-end pod selection
  • Loading branch information
chenamy2017 authored Aug 16, 2023
2 parents ba130ab + 37b3a76 commit 9c756dd
Show file tree
Hide file tree
Showing 9 changed files with 214 additions and 51 deletions.
10 changes: 10 additions & 0 deletions eBPF_Supermarket/TrafficManager/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
# Changelog

## Basic development framework and automatic compilation pipeline

- Initialize basic development framework. ([#464](https://github.com/linuxkerneltravel/lmp/pull/464), [@ESWZY](https://github.com/ESWZY))

## Kernel abstraction of Service and Pod, and maps for information storage and transfer

- Implement kernel abstraction of Service and Pod, and use maps for storage and information transfer. ([#482](https://github.com/linuxkerneltravel/lmp/pull/482), [@ESWZY](https://github.com/ESWZY))
- Provide map read and write interface on the Go program side and weighted back-end pod selection. ([#490](https://github.com/linuxkerneltravel/lmp/pull/490), [@ESWZY](https://github.com/ESWZY))
3 changes: 3 additions & 0 deletions eBPF_Supermarket/TrafficManager/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -11,5 +11,8 @@ init:
cd bpf/headers/bpf/; ./update.sh
cd bpf/headers/; bpftool btf dump file /sys/kernel/btf/vmlinux format c > vmlinux.h

trace:
sudo cat /sys/kernel/debug/tracing/trace_pipe

clean:
rm main
20 changes: 20 additions & 0 deletions eBPF_Supermarket/TrafficManager/README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,12 @@
# eBPF Traffic Manager

Based on the abstraction of Kubernetes Service and Pod, and the modification of network request events, this project can realize the following functions:

1. Parse Service and redirect requests directly to backend Pods, avoiding the NAT of iptables.
2. Filter out abnormal Pods to avoid requesting Pods that cannot work normally. If none of the pods are working, reject the request.
3. Grayscale release: canary release and blue-green release. Provides cross-Service traffic modification capabilities. Select a specific part of the caller to call a specific version of the service to realize traffic migration or version upgrade.
4. Support consistent hashing: use relevant fields (such as IP, port, protocol, etc.) for hash mapping to ensure that multiple requests from a specific source will be directed to a unique backend Pod.

## Install tutorial

### Ubuntu 22.04
Expand Down Expand Up @@ -37,3 +44,16 @@ sudo make
## Usage

Developing...

## Roadmap

Project development plan:

- [x] Build the basic development framework and automatic compilation pipeline.
- [x] Implement kernel abstraction of Service and Pod, and design corresponding maps for storage and information transfer.
- [ ] Implement cluster metadata analysis and map read and write update in user mode. Consider using the Kubernetes Controller's control loop to monitor changes to the current cluster and keep the metadata in the map always up to date.
- [ ] Performance optimization and development framework arrangement.
- [ ] Investigate and develop consistent hashing capabilities to achieve fast hashing and fast Pod selection.
- [ ] Investigate and develop grayscale release function of traffic, such as canary release and blue-green release, which provides cross-Service traffic modification capabilities.
- [ ] Implement filtering out specific abnormal nodes and Pods based on external cluster monitoring information.
- [ ] Documentation and tutorials.
9 changes: 6 additions & 3 deletions eBPF_Supermarket/TrafficManager/bpf/bpf_connect_bpf.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

Binary file not shown.
35 changes: 31 additions & 4 deletions eBPF_Supermarket/TrafficManager/bpf/connect.c
Original file line number Diff line number Diff line change
Expand Up @@ -12,15 +12,15 @@ static int sock4_forward_entry(struct bpf_sock_addr *ctx)
// 0x846F070A; // 10.7.111.132
// 0x0529050A; // 10.5.41.5
// 0x0100007F; // 127.0.0.1
struct lb4_key key = {}, orig_key;

struct lb4_key key = {}, orig_key; // must init the struct by zero
struct lb4_service *svc;
struct lb4_service *backend_slot;
int backend_id = -1;
struct lb4_backend *backend;

__be32 ori_dst_ip = ctx_get_dst_ip(ctx);
__be16 ori_dst_port = ctx_get_dst_port(ctx);
bpf_printk("dest: %08x:%04x", ori_dst_ip, ori_dst_port);
key.address = ori_dst_ip,
key.dport = ori_dst_port,
key.backend_slot = 0,
Expand All @@ -30,10 +30,37 @@ static int sock4_forward_entry(struct bpf_sock_addr *ctx)
svc = lb4_lookup_service(&key);
if (!svc || svc->count == 0)
return -ENXIO;
bpf_printk("dest: %08x:%04x", ori_dst_ip, ori_dst_port);
bpf_printk("1. Service backend ID (must be zero): %d", svc->backend_id);

// 2. find backend slots from service
key.backend_slot = sock_select_random_slot(svc->count);
// TODO: provide more (lightweight) selection logic later
int keep_possibility = MAX_BACKEND_SELECTION;
for (int i = 1; i <= MAX_BACKEND_SELECTION; i++) {
if(i > svc->count)
return -ENETRESET;

key.backend_slot = i;
backend_slot = lookup_lb4_backend_slot(&key);
if (!backend_slot)
return -ENOENT;

u32 random_value = bpf_get_prandom_u32();
bpf_printk("evaluate: %d < %d ? remain: %d", random_value % keep_possibility, backend_slot->possibility, keep_possibility);

if((random_value % keep_possibility) < backend_slot->possibility) {
key.backend_slot = i;
break;
}

keep_possibility -= backend_slot->possibility;
if(keep_possibility < 0)
return -ENOENT;
}

// // 2. find backend slots from service
// key.backend_slot = sock_select_random_slot(svc->count);

bpf_printk("2. select backend from service slot: %d", key.backend_slot);

// 3. lookup backend slot from constructed backend key
Expand Down Expand Up @@ -63,7 +90,7 @@ SEC("cgroup/connect4")
int sock4_connect(struct bpf_sock_addr *ctx)
{
int ret = sock4_forward_entry(ctx);
if(!ret)
if(ret)
bpf_printk("skipped, not modified");
return SYS_PROCEED;
}
Expand Down
146 changes: 109 additions & 37 deletions eBPF_Supermarket/TrafficManager/bpf/connect.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,59 +133,127 @@ func (p *Programs) Close() {
_ = os.Remove(MapsPinPath)
}

func Sample() {
progs, err := LoadProgram()
func (p *Programs) DeleteServiceItem(serviceIP string, slot int) bool {
serviceKey := NewService4Key(net.ParseIP(serviceIP), 0, u8proto.ANY, 0, uint16(slot))
err := p.connectObj.bpf_connectMaps.LB4SERVICES_MAP_V2.Delete(serviceKey.ToNetwork())
if err != nil {
fmt.Println("[ERROR] Loading program failed:", err)
return
fmt.Println("[ERROR] DeleteServiceItem: connectObj.bpf_connectMaps.LB4SERVICES_MAP_V2.Delete failed: ", err)
return false
}
return true
}

// set service
serviceIP := "1.1.1.1"
servicePort := 80
backendNumber := 2
func (p *Programs) InsertServiceItem(serviceIP string, servicePort int, backendNumber int) {
svcKey := NewService4Key(net.ParseIP(serviceIP), uint16(servicePort), u8proto.ANY, 0, 0)
// use index 0 to indicate service item
svcValue := NewService4Value(Backend4Key{0}, uint16(backendNumber))
err = progs.connectObj.bpf_connectMaps.LB4SERVICES_MAP_V2.Update(svcKey.ToNetwork(), svcValue.ToNetwork(), ebpf.UpdateAny)
// use index 0 to indicate service item, of course the possibility is zero, which is never be used
svcValue := NewService4Value(Backend4Key{0}, uint16(backendNumber), Possibility{0})
err := p.connectObj.bpf_connectMaps.LB4SERVICES_MAP_V2.Update(svcKey.ToNetwork(), svcValue.ToNetwork(), ebpf.UpdateAny)
if err != nil {
panic(err)
fmt.Println("[ERROR] InsertServiceItem: connectObj.bpf_connectMaps.LB4SERVICES_MAP_V2.Update failed: ", err)
return
}
fmt.Printf("[INFO] InsertServiceItem succeeded: serviceIP: %s servicePort: %d backendNumber: %d\n", serviceIP, servicePort, backendNumber)
}

podIp1 := "1.1.1.1"
backendPort1 := 80
backendID1 := 0
slotIndex1 := 1
backendKey1 := Backend4Key{uint32(backendID1)}
backendServiceKey := NewService4Key(net.ParseIP(serviceIP), uint16(servicePort), u8proto.ANY, 0, uint16(slotIndex1))
backendServiceValue := NewService4Value(backendKey1, 0)
err = progs.connectObj.bpf_connectMaps.LB4SERVICES_MAP_V2.Update(backendServiceKey.ToNetwork(), backendServiceValue.ToNetwork(), ebpf.UpdateAny)
func (p *Programs) DeleteBackendItem(backendID int) bool {
backendKey := Backend4Key{uint32(backendID)}
err := p.connectObj.bpf_connectMaps.LB4BACKEND_MAP_V2.Delete(backendKey)
if err != nil {
panic(err)
fmt.Println("[ERROR] DeleteBackendItem: connectObj.bpf_connectMaps.LB4BACKEND_MAP_V2.Delete Delete:", err)
return false
}
backendValue, _ := NewBackend4Value(net.ParseIP(podIp1), uint16(backendPort1), u8proto.ANY, loadbalancer.BackendStateActive)
err = progs.connectObj.bpf_connectMaps.LB4BACKEND_MAP_V2.Update(backendKey1, backendValue.ToNetwork(), ebpf.UpdateAny)
return true
}

func (p *Programs) InsertBackendItem(serviceIP string, servicePort int, backendIP string, backendPort int, backendID int, slotIndex int, possibility float64) bool {
backendKey := Backend4Key{uint32(backendID)}
backendServiceKey := NewService4Key(net.ParseIP(serviceIP), uint16(servicePort), u8proto.ANY, 0, uint16(slotIndex))
backendServiceValue := NewService4Value(backendKey, 0, Possibility{possibility})
err := p.connectObj.bpf_connectMaps.LB4SERVICES_MAP_V2.Update(backendServiceKey.ToNetwork(), backendServiceValue.ToNetwork(), ebpf.UpdateAny)
if err != nil {
panic(err)
fmt.Println("[ERROR] InsertBackendItem: connectObj.bpf_connectMaps.LB4SERVICES_MAP_V2.Update failed:", err)
return false
}

// python3 -m http.server 8888
podIp2 := "127.0.0.1"
backendPort2 := 8888
backendID2 := 1
slotIndex2 := 2
backendKey2 := Backend4Key{uint32(backendID2)}
backendServiceKey2 := NewService4Key(net.ParseIP(serviceIP), uint16(servicePort), u8proto.ANY, 0, uint16(slotIndex2))
backendServiceValue2 := NewService4Value(backendKey2, 0)
err = progs.connectObj.bpf_connectMaps.LB4SERVICES_MAP_V2.Update(backendServiceKey2.ToNetwork(), backendServiceValue2.ToNetwork(), ebpf.UpdateAny)
backendValue, _ := NewBackend4Value(net.ParseIP(backendIP), uint16(backendPort), u8proto.ANY, loadbalancer.BackendStateActive)
err = p.connectObj.bpf_connectMaps.LB4BACKEND_MAP_V2.Update(backendKey, backendValue.ToNetwork(), ebpf.UpdateAny)
if err != nil {
panic(err)
fmt.Println("[ERROR] InsertBackendItem: connectObj.bpf_connectMaps.LB4BACKEND_MAP_V2.Update failed:", err)
return false
}
backendValue2, _ := NewBackend4Value(net.ParseIP(podIp2), uint16(backendPort2), u8proto.ANY, loadbalancer.BackendStateActive)
err = progs.connectObj.bpf_connectMaps.LB4BACKEND_MAP_V2.Update(backendKey2, backendValue2.ToNetwork(), ebpf.UpdateAny)

fmt.Printf("[INFO] InsertBackendItem succeeded: serviceIP: %s servicePort: %d backendID: %d slotIndex: %d possibility: %.2f\n", serviceIP, servicePort, backendID, slotIndex, possibility)
return true
}

// AutoInsertService inserts an organized service item into map
// TODO: implement it
func (p *Programs) AutoInsertService(serviceIP string, servicePort int, backendNumber int) {
p.InsertServiceItem(serviceIP, servicePort, backendNumber)
}

// AutoDeleteService deletes an organized service item with backend items from map
func (p *Programs) AutoDeleteService(serviceIP string, servicePort int) bool {
serviceKey := NewService4Key(net.ParseIP(serviceIP), uint16(servicePort), u8proto.ANY, 0, 0)
serviceValue := NewService4Value(Backend4Key{0}, uint16(0), Possibility{0})
err := p.connectObj.bpf_connectMaps.LB4SERVICES_MAP_V2.Lookup(serviceKey.ToNetwork(), serviceValue)
if err != nil {
panic(err)
fmt.Println("[ERROR] AutoDeleteService: connectObj.bpf_connectMaps.LB4SERVICES_MAP_V2.Lookup failed:", err)
return false
}
fmt.Println("[DEBUG] To delete service", serviceValue)
p.DeleteServiceItem(serviceIP, 0)
for i := 0; i < int(serviceValue.Count); i++ {
backendServiceKey := NewService4Key(net.ParseIP(serviceIP), 0, u8proto.ANY, 0, uint16(i+1))
backendServiceValue := NewService4Value(Backend4Key{uint32(0)}, 0, Possibility{0})
err := p.connectObj.bpf_connectMaps.LB4SERVICES_MAP_V2.Lookup(backendServiceKey.ToNetwork(), backendServiceValue)
if err != nil {
fmt.Println("[WARNING] AutoDeleteService: connectObj.bpf_connectMaps.LB4SERVICES_MAP_V2.Lookup failed:", err)
break
}
fmt.Println("[DEBUG] To delete backend:", backendServiceValue)
p.AutoDeleteBackend(int(backendServiceValue.BackendID.ID))
p.DeleteServiceItem(serviceIP, i+1)
}
fmt.Printf("[INFO] AutoDeleteService succeeded: serviceIP: %s servicePort: %d\n", serviceIP, servicePort)
return true
}

// AutoInsertBackend inserts an organized backend item into map
func (p *Programs) AutoInsertBackend(serviceIP string, servicePort int, backendIP string, backendPort int, slotIndex int, possibility float64) {
backendID := p.currentIndex
p.currentIndex++
ok := p.InsertBackendItem(serviceIP, servicePort, backendIP, backendPort, backendID, slotIndex, possibility)
if ok {
p.backEndSet[backendID] = true
}
}

// AutoDeleteBackend deletes an backend item from map
func (p *Programs) AutoDeleteBackend(backendID int) bool {
delete(p.backEndSet, backendID)
ok := p.DeleteBackendItem(backendID)
if ok {
fmt.Println("[INFO] AutoDeleteBackend succeeded: backendID:", backendID)
}
return ok
}

func Sample() {
progs, err := LoadProgram()
if err != nil {
fmt.Println("[ERROR] Loading program failed:", err)
return
}

// set service
serviceIP := "1.1.1.1"
servicePort := 80 // TODO: 0 means it will not be modified
backendPort1 := 80
backendPort2 := 8888
progs.InsertServiceItem(serviceIP, servicePort, 2)
progs.AutoInsertBackend(serviceIP, servicePort, "1.1.1.1", backendPort1, 1, 0.75)
progs.AutoInsertBackend(serviceIP, servicePort, "127.0.0.1", backendPort2, 2, 0.25)

err = progs.Attach()
if err != nil {
Expand All @@ -195,4 +263,8 @@ func Sample() {

time.Sleep(time.Minute)
fmt.Println("[INFO] Time is up...")
progs.AutoDeleteService("1.1.1.1", servicePort)
//c := make(chan os.Signal, 1)
//signal.Notify(c)
//<-c
}
14 changes: 14 additions & 0 deletions eBPF_Supermarket/TrafficManager/bpf/connect.h
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@
#define LB_SERVICE_MAP_MAX_ENTRIES 65536
#define LB_BACKENDS_MAP_MAX_ENTRIES 65536

/* Lookup scope for externalTrafficPolicy=Local */
#define LB_LOOKUP_SCOPE_EXT 0
#define LB_LOOKUP_SCOPE_INT 1

#define CONDITIONAL_PREALLOC 0

#define MAX_BACKEND_SELECTION 2048

// sudo cat /sys/kernel/debug/tracing/trace_pipe

#define print_ip_formatted(ip) \
Expand All @@ -60,6 +68,9 @@ struct lb4_key {
struct lb4_service {
__u32 backend_id; /* Backend ID in lb4_backends */
__u16 count;
__u16 possibility;
__u8 flags; // TODO: timeout flag
__u8 flags2;
__u8 pad[2];
};

Expand All @@ -76,6 +87,7 @@ struct {
__type(value, struct lb4_service);
__uint(pinning, LIBBPF_PIN_BY_NAME);
__uint(max_entries, LB_SERVICE_MAP_MAX_ENTRIES);
__uint(map_flags, CONDITIONAL_PREALLOC);
} LB4_SERVICES_MAP_V2 SEC(".maps");

struct {
Expand All @@ -84,6 +96,7 @@ struct {
__type(value, struct lb4_backend);
__uint(pinning, LIBBPF_PIN_BY_NAME);
__uint(max_entries, LB_BACKENDS_MAP_MAX_ENTRIES);
__uint(map_flags, CONDITIONAL_PREALLOC);
} LB4_BACKEND_MAP_V2 SEC(".maps");

static __always_inline __be32 ctx_get_dst_ip(const struct bpf_sock_addr *ctx)
Expand Down Expand Up @@ -118,6 +131,7 @@ static __always_inline struct lb4_service *lb4_lookup_service(struct lb4_key *ke
{
struct lb4_service *svc;

key->scope = LB_LOOKUP_SCOPE_EXT;
svc = bpf_map_lookup_elem(&LB4_SERVICES_MAP_V2, key);
if (svc)
return svc;
Expand Down
Loading

0 comments on commit 9c756dd

Please sign in to comment.