Skip to content

Commit

Permalink
feat(obs): change parallelized file system to obs Bucket
Browse files Browse the repository at this point in the history
  • Loading branch information
Zippo-Wang committed Aug 16, 2024
1 parent b082aad commit 48b456b
Show file tree
Hide file tree
Showing 13 changed files with 227 additions and 154 deletions.
5 changes: 1 addition & 4 deletions cluster/images/obs-csi-plugin/Dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -5,16 +5,13 @@ LABEL description="HuaweiCloud CSI Plugin"

WORKDIR /obs-csi

COPY obsfs_CentOS7.6_amd64.tar.gz ./
COPY obsfs_Ubuntu16.04_amd64.tar.gz ./
COPY entrypoint.sh entrypoint.sh
COPY nsenter /nsenter
COPY csi-connector-server ./
COPY csi-connector.service ./
COPY obs-csi-plugin obs-csi-plugin
COPY install_obsfs.sh ./
COPY install_s3fs.sh ./
COPY stop-server.sh ./
COPY huaweicloud-obs-obsfs.tar.gz ./

RUN chmod +x obs-csi-plugin
RUN chmod +x entrypoint.sh
Expand Down
31 changes: 16 additions & 15 deletions cluster/images/obs-csi-plugin/csi-connector-server.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ const (
region = "region"
cloud = "cloud"
credential = "credential"
defaultOpts = "-o big_writes -o max_write=131072 -o use_ino"
defaultOpts = "-o nonempty -o big_writes -o max_write=131072"
)

type ResponseBody struct {
Expand Down Expand Up @@ -119,7 +119,7 @@ func main() {
//nolint:errcheck
flag.CommandLine.Parse([]string{})

initObsfsUtil()
initS3fsUtil()
if checkFileExists(obs.SocketPath) {
if err := os.Remove(obs.SocketPath); err != nil {
log.Fatalf("Failed to remove path: %s, err: %v", obs.SocketPath, err)
Expand Down Expand Up @@ -153,26 +153,27 @@ func mountHandler(parameters map[string]string) error {
if parameters[cloud] == "prod-cloud-ocb.orange-business.com" {
obsName = "oss"
}

mountOpts := parameters[mountFlags]
if mountOpts == "" {
mountOpts = defaultOpts
}

options := []string{
"obsfs",
parameters[bucketName],
parameters[targetPath],
fmt.Sprintf("-o url=%s.%s.%s", obsName, parameters[region], parameters[cloud]),
fmt.Sprintf("-o passwd_file=%s", credentialFile),
mountOpts,
"-o",
fmt.Sprintf("url=https://%s.%s.%s", obsName, parameters[region], parameters[cloud]),
"-o",
fmt.Sprintf("passwd_file=%s", credentialFile),
}
mntOptsArr := strings.Split(mountOpts, " ")
options = append(options, mntOptsArr...)

cmd := exec.Command("sh", "-c")
cmd.Args = append(cmd.Args, strings.Join(options, " "))
cmd := exec.Command("s3fs", options...)
out, err := cmd.CombinedOutput()
if err != nil {
log.Errorf("failed to mount CMD: obsfs %s, output: %s, error: %v", strings.Join(options, " "), string(out), err)
return fmt.Errorf("failed to mount CMD: obsfs %s, output: %s, error: %v", strings.Join(options, " "), string(out), err)
log.Errorf("failed to mount bucket to node, CMD: s3fs %s, output: %s, error: %v", strings.Join(options, " "), string(out), err)
return fmt.Errorf("failed to mount bucket to node, CMD: s3fs %s, output: %s, error: %v", strings.Join(options, " "), string(out), err)
}
log.Infof("success to mount CMD: %s", strings.Join(options, " "))
return nil
Expand All @@ -186,11 +187,11 @@ func deleteCredential(credential string) {
}
}

func initObsfsUtil() {
cmd := fmt.Sprintf("sh %s/install_obsfs.sh >> %s/connector.log 2>&1 &", credentialDir, credentialDir)
func initS3fsUtil() {
cmd := fmt.Sprintf("sh %s/install_s3fs.sh > %s/connector.log 2>&1 &", credentialDir, credentialDir)
out, err := exec.Command("sh", "-c", cmd).CombinedOutput()
log.Infof("install obsfs %s", string(out))
log.Infof("install s3fs %s", string(out))
if err != nil {
log.Errorf("error install obsfs: %s", err)
log.Errorf("error install s3fs: %s", err)
}
}
5 changes: 1 addition & 4 deletions cluster/images/obs-csi-plugin/entrypoint.sh
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,7 @@
HOST_CMD="/nsenter --mount=/proc/1/ns/mnt"
mkdir -p /var/lib/csi/

cp -f /obs-csi/huaweicloud-obs-obsfs.tar.gz /var/lib/csi/huaweicloud-obs-obsfs.tar.gz
cp -f /obs-csi/obsfs_CentOS7.6_amd64.tar.gz /var/lib/csi/obsfs_CentOS7.6_amd64.tar.gz
cp -f /obs-csi/obsfs_Ubuntu16.04_amd64.tar.gz /var/lib/csi/obsfs_Ubuntu16.04_amd64.tar.gz
cp -f /obs-csi/install_obsfs.sh /var/lib/csi/install_obsfs.sh
cp -f /obs-csi/install_s3fs.sh /var/lib/csi/install_s3fs.sh

echo "Starting install obs csi-connector-server...."
$HOST_CMD systemctl stop csi-connector.service
Expand Down
Binary file not shown.
96 changes: 0 additions & 96 deletions cluster/images/obs-csi-plugin/install_obsfs.sh

This file was deleted.

111 changes: 111 additions & 0 deletions cluster/images/obs-csi-plugin/install_s3fs.sh
Original file line number Diff line number Diff line change
@@ -0,0 +1,111 @@
#!/bin/bash

packageCmd=("yum" "dnf" "apt")

checkS3fsInstalled() {
echo "[INFO] Check version and check if it works"
isInstalled=1
command -v s3fs >/dev/null 2>&1 || { isInstalled=0; }
if [ ${isInstalled} = 0 ]; then
echo "[WARN] has no command named s3fs"
return ${isInstalled}
fi
echo "[INFO] s3fs has been installed"
return ${isInstalled}
}

checkLinuxOS() {
if grep -i -q "EulerOS" /etc/os-release; then
# EulerOS
return 5
elif [ -f "/etc/redhat-release" ]; then
# CentOS or RHEL or EulerOS
if grep -i -q "CentOS" /etc/redhat-release; then
return 1
elif grep -i -q "Fedora" /etc/redhat-release; then
return 4
elif grep -i -q "EulerOS" /etc/redhat-release; then
return 5
fi
elif [ -f "/etc/issue" ]; then
# Ubuntu or Debian or Euler
if grep -i -q "ubuntu" /etc/issue; then
return 2
elif grep -i -q "debian" /etc/issue; then
return 3
elif grep -i -q "Euler" /etc/os-release; then
return 5
fi
elif [ -f "/etc/fedora-release" ]; then
# Fedora
if grep -i -q "Fedora" /etc/fedora-release; then
return 4
fi
else
# unknown OS
return 0
fi
}

installS3fs() {
echo "[INFO] Install dependencies and s3fs"
pgkManageCmd=${1}

if which yum >/dev/null 2>&1 && [ ${pgkManageCmd} = "yum" ]; then
echo "[INFO] command 'yum' is available, trying to install s3fs with yum"
sudo yum -y install epel-release
sudo yum -y install s3fs-fuse
return
elif which apt >/dev/null 2>&1 && [ ${pgkManageCmd} = "apt" ]; then
echo "[INFO] command 'apt' is available, trying to install s3fs with apt"
sudo apt -y install s3fs
return
elif which dnf >/dev/null 2>&1 && [ ${pgkManageCmd} = "dnf" ]; then
echo "[INFO] command 'dnf' is available, trying to install s3fs with dnf"
sudo dnf -y install s3fs-fuse
return
else
echo "[ERROR] unsupported systems, please install s3fs manually"
fi
}

# pre install
checkS3fsInstalled
isInstalled=$?
if [ ${isInstalled} = 1 ]; then
exit 1
fi

# install
checkLinuxOS
osCode=$?
echo "[INFO] OS Code is ${osCode}"

if [ ${osCode} = 1 ] || [ ${osCode} = 5 ]; then
echo "[INFO] OS is CentOS or EulerOS, use yum to install s3fs"
installS3fs "yum"
elif [ ${osCode} = 2 ] || [ ${osCode} = 3 ]; then
echo "[INFO] OS is Ubuntu or Debian, use apt to install s3fs"
installS3fs "apt"
elif [ ${osCode} = 4 ]; then
echo "[INFO] OS is Fedora, use dnf to install s3fs"
installS3fs "dnf"
else
echo "[WARN] Unknown OS, try to force installation of s3fs"
for cmd in ${packageCmd[@]}; do
installS3fs ${cmd}
checkS3fsInstalled
isInstalled=$?
if [ ${isInstalled} = 1 ]; then
exit 1
fi
done
fi

# post install
checkS3fsInstalled
isInstalled=$?
if [ ${isInstalled} = 1 ]; then
exit 1
fi
echo "[ERROR] failed to install s3fs, please install s3fs manually"
Binary file not shown.
Binary file not shown.
12 changes: 6 additions & 6 deletions pkg/obs/controllerserver.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (cs *controllerServer) CreateVolume(_ context.Context, req *csi.CreateVolum
return nil, err
}

if volume, err := services.GetParallelFSBucket(credentials, volName); err != nil && status.Code(err) != codes.NotFound {
if volume, err := services.GetObsBucket(credentials, volName); err != nil && status.Code(err) != codes.NotFound {
return nil, err
} else if volume != nil {
log.Infof("Volume %s existence, skip creating", volName)
Expand All @@ -61,7 +61,7 @@ func (cs *controllerServer) CreateVolume(_ context.Context, req *csi.CreateVolum
}
}

volume, err := services.GetParallelFSBucket(credentials, volName)
volume, err := services.GetObsBucket(credentials, volName)
if err != nil {
return nil, err
}
Expand All @@ -80,7 +80,7 @@ func (cs *controllerServer) DeleteVolume(_ context.Context, req *csi.DeleteVolum
}

credentials := cs.Driver.cloud
volume, err := services.GetParallelFSBucket(credentials, volName)
volume, err := services.GetObsBucket(credentials, volName)
if err != nil {
if common.IsNotFound(err) {
log.Infof("Volume %s does not exist, skip deleting", volName)
Expand Down Expand Up @@ -115,7 +115,7 @@ func (cs *controllerServer) ControllerGetVolume(_ context.Context, req *csi.Cont
return nil, status.Error(codes.InvalidArgument, "Validation failed, volume ID cannot be empty")
}

bucket, err := services.GetParallelFSBucket(cs.Driver.cloud, volumeID)
bucket, err := services.GetObsBucket(cs.Driver.cloud, volumeID)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -216,7 +216,7 @@ func (cs *controllerServer) ValidateVolumeCapabilities(_ context.Context, req *c
if len(volumeID) == 0 {
return nil, status.Error(codes.InvalidArgument, "Validation failed, volume ID cannot be empty")
}
if _, err := services.GetParallelFSBucket(cs.Driver.cloud, volumeID); err != nil {
if _, err := services.GetObsBucket(cs.Driver.cloud, volumeID); err != nil {
return nil, err
}

Expand Down Expand Up @@ -263,7 +263,7 @@ func (cs *controllerServer) ControllerExpandVolume(_ context.Context, req *csi.C
"Validation failed, after round-up volume size %v exceeds the max size %v", sizeBytes, maxSizeBytes)
}

volume, err := services.GetParallelFSBucket(cc, volumeID)
volume, err := services.GetObsBucket(cc, volumeID)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/obs/mount.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,14 @@ func sendCommand(cmd CommandRPC, mountClient http.Client) error {
if err != nil {
return err
}
log.Infof("Start sending command: %s", string(marshal))
response, err := mountClient.Post("http://unix", "application/json", bytes.NewReader(marshal))
if err != nil {
return status.Errorf(codes.Internal, "Failed to post command, err: %v", err)
}
defer response.Body.Close()
if response.StatusCode != http.StatusOK {
respBody, err := ioutil.ReadAll(response.Body)
log.Infof("start to mount bucket, cmd: %v", string(respBody))
if err != nil {
return status.Errorf(codes.Internal, "Failed to read responseBody, err: %v", err)
}
Expand Down
Loading

0 comments on commit 48b456b

Please sign in to comment.