Skip to content

Commit

Permalink
feat(core/containerd): refactor containerd interaction to v2 APIs
Browse files Browse the repository at this point in the history
* Move from v1 to v2 APIs
* Use Event Driven Approach
* Watch for tasks instead of container events, this will prevent mismanagement of events and policies on KubeArmor end incase of reuse of PIDNS/MNTNS by contaienr runtime, since we delete container information as soon as the root task exits

Signed-off-by: daemon1024 <[email protected]>
  • Loading branch information
daemon1024 committed Dec 19, 2024
1 parent dc0bb33 commit 5b5b3ec
Show file tree
Hide file tree
Showing 5 changed files with 427 additions and 251 deletions.
197 changes: 102 additions & 95 deletions KubeArmor/core/containerdHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ import (
"path/filepath"
"strconv"
"strings"
"time"

"github.com/containerd/typeurl/v2"
"google.golang.org/protobuf/proto"

"golang.org/x/exp/slices"

Expand All @@ -24,12 +24,14 @@ import (
"github.com/kubearmor/KubeArmor/KubeArmor/state"
tp "github.com/kubearmor/KubeArmor/KubeArmor/types"

pb "github.com/containerd/containerd/api/services/containers/v1"
pt "github.com/containerd/containerd/api/services/tasks/v1"
"github.com/containerd/containerd/namespaces"
"google.golang.org/grpc"
"github.com/containerd/containerd/v2/core/events"

specs "github.com/opencontainers/runtime-spec/specs-go"

apievents "github.com/containerd/containerd/api/events"
task "github.com/containerd/containerd/api/services/tasks/v1"
v2 "github.com/containerd/containerd/v2/client"
"github.com/containerd/containerd/v2/pkg/namespaces"
)

// ======================== //
Expand Down Expand Up @@ -73,60 +75,51 @@ func init() {

// ContainerdHandler Structure
type ContainerdHandler struct {
// connection
conn *grpc.ClientConn

// container client
client pb.ContainersClient

// task client
taskClient pt.TasksClient
client *v2.Client

// context
containerd context.Context
docker context.Context

// active containers
containers map[string]context.Context
k8sEventsCh <-chan *events.Envelope
dockerEventsCh <-chan *events.Envelope
}

// NewContainerdHandler Function
func NewContainerdHandler() *ContainerdHandler {
ch := &ContainerdHandler{}

conn, err := grpc.Dial(cfg.GlobalCfg.CRISocket, grpc.WithInsecure())
// Establish connection to containerd
client, err := v2.New(strings.TrimPrefix(cfg.GlobalCfg.CRISocket, "unix://"))
if err != nil {
kg.Errf("Unable to connect to containerd v2: %v", err)
return nil
}
ch.client = client

ch.conn = conn

// container client
ch.client = pb.NewContainersClient(ch.conn)

// task client
ch.taskClient = pt.NewTasksClient(ch.conn)
// Subscribe to containerd events

// docker namespace
ch.docker = namespaces.WithNamespace(context.Background(), "moby")

dockerEventsCh, _ := client.EventService().Subscribe(ch.docker, "")
ch.dockerEventsCh = dockerEventsCh

// containerd namespace
ch.containerd = namespaces.WithNamespace(context.Background(), "k8s.io")

// active containers
ch.containers = map[string]context.Context{}

kg.Print("Initialized Containerd Handler")
k8sEventsCh, _ := client.EventService().Subscribe(ch.containerd, "")
ch.k8sEventsCh = k8sEventsCh

return ch
}

// Close Function
func (ch *ContainerdHandler) Close() {
if ch.conn != nil {
if err := ch.conn.Close(); err != nil {
kg.Err(err.Error())
}
if err := ch.client.Close(); err != nil {
kg.Err(err.Error())
}
}

Expand All @@ -136,22 +129,28 @@ func (ch *ContainerdHandler) Close() {

// GetContainerInfo Function
func (ch *ContainerdHandler) GetContainerInfo(ctx context.Context, containerID string, OwnerInfo map[string]tp.PodOwner) (tp.Container, error) {
req := pb.GetContainerRequest{ID: containerID}
res, err := ch.client.Get(ctx, &req)
res, err := ch.client.ContainerService().Get(ctx, containerID)
if err != nil {
return tp.Container{}, err
}

// skip if pause container
if res.Labels != nil {
if containerKind, ok := res.Labels["io.cri-containerd.kind"]; ok && containerKind == "sandbox" {
return tp.Container{}, fmt.Errorf("pause container")
}
}

container := tp.Container{}

// == container base == //

container.ContainerID = res.Container.ID
container.ContainerName = res.Container.ID
container.ContainerID = res.ID
container.ContainerName = res.ID
container.NamespaceName = "Unknown"
container.EndPointName = "Unknown"

containerLabels := res.Container.Labels
containerLabels := res.Labels
if _, ok := containerLabels["io.kubernetes.pod.namespace"]; ok { // kubernetes
if val, ok := containerLabels["io.kubernetes.pod.namespace"]; ok {
container.NamespaceName = val
Expand All @@ -171,7 +170,7 @@ func (ch *ContainerdHandler) GetContainerInfo(ctx context.Context, containerID s
}
}

iface, err := typeurl.UnmarshalAny(res.Container.Spec)
iface, err := typeurl.UnmarshalAny(res.Spec)
if err != nil {
return tp.Container{}, err
}
Expand All @@ -186,14 +185,16 @@ func (ch *ContainerdHandler) GetContainerInfo(ctx context.Context, containerID s

// == //

taskReq := pt.ListPidsRequest{ContainerID: container.ContainerID}
if taskRes, err := Containerd.taskClient.ListPids(ctx, &taskReq); err == nil {
taskReq := task.ListPidsRequest{ContainerID: container.ContainerID}
if taskRes, err := ch.client.TaskService().ListPids(ctx, &taskReq); err == nil {
if len(taskRes.Processes) == 0 {
return container, err
}

pid := strconv.Itoa(int(taskRes.Processes[0].Pid))

container.Pid = taskRes.Processes[0].Pid

if data, err := os.Readlink(filepath.Join(cfg.GlobalCfg.ProcFsMount, pid, "/ns/pid")); err == nil {
if _, err := fmt.Sscanf(data, "pid:[%d]\n", &container.PidNS); err != nil {
kg.Warnf("Unable to get PidNS (%s, %s, %s)", containerID, pid, err.Error())
Expand All @@ -212,12 +213,12 @@ func (ch *ContainerdHandler) GetContainerInfo(ctx context.Context, containerID s
// == //

if !cfg.GlobalCfg.K8sEnv {
container.ContainerImage = res.Container.Image //+ kl.GetSHA256ofImage(inspect.Image)
container.ContainerImage = res.Image //+ kl.GetSHA256ofImage(inspect.Image)

container.NodeName = cfg.GlobalCfg.Host

labels := []string{}
for k, v := range res.Container.Labels {
for k, v := range res.Labels {
labels = append(labels, k+"="+v)
}
for k, v := range spec.Annotations {
Expand Down Expand Up @@ -246,52 +247,25 @@ func (ch *ContainerdHandler) GetContainerInfo(ctx context.Context, containerID s
func (ch *ContainerdHandler) GetContainerdContainers() map[string]context.Context {
containers := map[string]context.Context{}

req := pb.ListContainersRequest{}

if containerList, err := ch.client.List(ch.docker, &req, grpc.MaxCallRecvMsgSize(kl.DefaultMaxRecvMaxSize)); err == nil {
for _, container := range containerList.Containers {
if containerList, err := ch.client.ContainerService().List(ch.docker); err == nil {
for _, container := range containerList {
containers[container.ID] = ch.docker
}
} else {
kg.Err(err.Error())
}

if containerList, err := ch.client.List(ch.containerd, &req, grpc.MaxCallRecvMsgSize(kl.DefaultMaxRecvMaxSize)); err == nil {
for _, container := range containerList.Containers {
if containerList, err := ch.client.ContainerService().List(ch.containerd); err == nil {
for _, container := range containerList {
containers[container.ID] = ch.containerd
}
} else {
kg.Err(err.Error())
}

return containers
}

// GetNewContainerdContainers Function
func (ch *ContainerdHandler) GetNewContainerdContainers(containers map[string]context.Context) map[string]context.Context {
newContainers := map[string]context.Context{}

for activeContainerID, context := range containers {
if _, ok := ch.containers[activeContainerID]; !ok {
newContainers[activeContainerID] = context
}
}

return newContainers
}

// GetDeletedContainerdContainers Function
func (ch *ContainerdHandler) GetDeletedContainerdContainers(containers map[string]context.Context) map[string]context.Context {
deletedContainers := map[string]context.Context{}

for globalContainerID := range ch.containers {
if _, ok := containers[globalContainerID]; !ok {
deletedContainers[globalContainerID] = context.TODO()
delete(ch.containers, globalContainerID)
}
}

ch.containers = containers

return deletedContainers
}

// UpdateContainerdContainer Function
func (dm *KubeArmorDaemon) UpdateContainerdContainer(ctx context.Context, containerID, action string) bool {
// check if Containerd exists
Expand All @@ -303,6 +277,7 @@ func (dm *KubeArmorDaemon) UpdateContainerdContainer(ctx context.Context, contai
// get container information from containerd client
container, err := Containerd.GetContainerInfo(ctx, containerID, dm.OwnerInfo)
if err != nil {
kg.Err(err.Error())
return false
}

Expand Down Expand Up @@ -565,38 +540,70 @@ func (dm *KubeArmorDaemon) MonitorContainerdEvents() {

dm.Logger.Print("Started to monitor Containerd events")

containers := Containerd.GetContainerdContainers()

if len(containers) > 0 {
for containerID, context := range containers {
if !dm.UpdateContainerdContainer(context, containerID, "start") {
continue
}
}
}
for {
select {
case <-StopChan:
return

default:
containers := Containerd.GetContainerdContainers()
case envelope := <-Containerd.k8sEventsCh:
dm.handleContainerdEvent(envelope, Containerd.containerd)

invalidContainers := []string{}
case envelope := <-Containerd.dockerEventsCh:
dm.handleContainerdEvent(envelope, Containerd.docker)

newContainers := Containerd.GetNewContainerdContainers(containers)
deletedContainers := Containerd.GetDeletedContainerdContainers(containers)
}
}
}

if len(newContainers) > 0 {
for containerID, context := range newContainers {
if !dm.UpdateContainerdContainer(context, containerID, "start") {
invalidContainers = append(invalidContainers, containerID)
}
}
}
func (dm *KubeArmorDaemon) handleContainerdEvent(envelope *events.Envelope, context context.Context) {
if envelope == nil {
return
}

for _, invalidContainerID := range invalidContainers {
delete(Containerd.containers, invalidContainerID)
}
// Handle the different event types
switch envelope.Topic {
case "/containers/delete":
deleteContainer := &apievents.ContainerDelete{}

if len(deletedContainers) > 0 {
for containerID, context := range deletedContainers {
dm.UpdateContainerdContainer(context, containerID, "destroy")
}
}
err := proto.Unmarshal(envelope.Event.GetValue(), deleteContainer)
if err != nil {
kg.Errf("failed to unmarshal container's delete event: %v", err)
}
dm.UpdateContainerdContainer(context, deleteContainer.GetID(), "destroy")

case "/tasks/start":
startTask := &apievents.TaskStart{}

err := proto.Unmarshal(envelope.Event.GetValue(), startTask)
if err != nil {
kg.Errf("failed to unmarshal container's start task: %v", err)
}
dm.UpdateContainerdContainer(context, startTask.GetContainerID(), "start")

case "/tasks/exit":
exitTask := &apievents.TaskStart{}

err := proto.Unmarshal(envelope.Event.GetValue(), exitTask)
if err != nil {
kg.Errf("failed to unmarshal container's exit task: %v", err)
}

dm.ContainersLock.RLock()
pid := dm.Containers[exitTask.GetContainerID()].Pid
dm.ContainersLock.RUnlock()

if pid == exitTask.GetPid() {
dm.UpdateContainerdContainer(context, exitTask.GetContainerID(), "destroy")
}

time.Sleep(time.Millisecond * 500)
}
}
3 changes: 2 additions & 1 deletion KubeArmor/core/dockerHandler.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import (
"time"

"github.com/docker/docker/api/types"
"github.com/docker/docker/api/types/container"
"github.com/docker/docker/api/types/events"
"github.com/docker/docker/client"

Expand Down Expand Up @@ -267,7 +268,7 @@ func (dm *KubeArmorDaemon) GetAlreadyDeployedDockerContainers() {
}
}

if containerList, err := Docker.DockerClient.ContainerList(context.Background(), types.ContainerListOptions{}); err == nil {
if containerList, err := Docker.DockerClient.ContainerList(context.Background(), container.ListOptions{}); err == nil {
for _, dcontainer := range containerList {
// get container information from docker client
container, err := Docker.GetContainerInfo(dcontainer.ID, dm.OwnerInfo)
Expand Down
Loading

0 comments on commit 5b5b3ec

Please sign in to comment.