Skip to content

Commit

Permalink
[INLONG-11400][Manager] Support Airflow schedule engine (#11479)
Browse files Browse the repository at this point in the history
  • Loading branch information
Zkplo authored Nov 20, 2024
1 parent 3700baa commit befe172
Show file tree
Hide file tree
Showing 36 changed files with 2,864 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.airflow;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@NoArgsConstructor
@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
@ApiModel(description = "Full representation of the connection.")
public class AirflowConnection {

@JsonProperty("connection_id")
@ApiModelProperty("The connection ID.")
private String connectionId;

@JsonProperty("conn_type")
@ApiModelProperty("The connection type.")
private String connType;

@JsonProperty("description")
@ApiModelProperty("The description of the connection.")
private String description;

@JsonProperty("host")
@ApiModelProperty("Host of the connection.")
private String host;

@JsonProperty("login")
@ApiModelProperty("Login of the connection.")
private String login;

@JsonProperty("schema")
@ApiModelProperty("Schema of the connection.")
private String schema;

@JsonProperty("port")
@ApiModelProperty("Port of the connection.")
private Integer port;

@JsonProperty("password")
@ApiModelProperty("Password of the connection.")
private String password;

@JsonProperty("extra")
@ApiModelProperty("Additional information description of the connection.")
private String extra;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.airflow;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
@ApiModel(description = "DAG Description Information.")
public class DAG {

@JsonProperty("dag_id")
@ApiModelProperty("The ID of the DAG.")
private String dagId;

@JsonProperty("root_dag_id")
@ApiModelProperty("If the DAG is SubDAG then it is the top level DAG identifier. Otherwise, null.")
private String rootDagId;

@JsonProperty("is_paused")
@ApiModelProperty("Whether the DAG is paused.")
private Boolean isPaused;

@JsonProperty("is_active")
@ApiModelProperty("Whether the DAG is currently seen by the scheduler(s).")
private Boolean isActive;

@JsonProperty("description")
@ApiModelProperty("User-provided DAG description, which can consist of several sentences or paragraphs that describe DAG contents.")
private String description;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.airflow;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import java.util.List;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
@ApiModel(description = "Collection of DAGs.")
public class DAGCollection {

@JsonProperty("dags")
@ApiModelProperty("List of DAGs.")
private List<DAG> dags = null;

@JsonProperty("total_entries")
@ApiModelProperty("The length of DAG list.")
private Integer totalEntries;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.airflow;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
@ApiModel(description = "DAGRun Description Information.")
public class DAGRun {

@JsonProperty("conf")
@ApiModelProperty("JSON object describing additional configuration parameters.")
private Object conf;

@JsonProperty("dag_id")
@ApiModelProperty("Airflow DAG id.")
private String dagId;

@JsonProperty("dag_run_id")
@ApiModelProperty("Airflow DAGRun id (Nullable).")
private String dagRunId;

@JsonProperty("end_date")
@ApiModelProperty("The end time of this DAGRun.")
private String endDate;

@JsonProperty("start_date")
@ApiModelProperty("The start time of this DAGRun.")
private String startDate;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.airflow;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.AllArgsConstructor;
import lombok.Builder;
import lombok.Data;
import lombok.NoArgsConstructor;

@Data
@Builder
@NoArgsConstructor
@AllArgsConstructor
@JsonIgnoreProperties(ignoreUnknown = true)
@ApiModel(description = "DAGRunConf Description Information.")
public class DAGRunConf {

@JsonProperty("inlong_group_id")
@ApiModelProperty("Specify the Inlong group ID")
private String inlongGroupId;

@JsonProperty("start_time")
@ApiModelProperty("The start time of DAG scheduling.")
private long startTime;

@JsonProperty("end_time")
@ApiModelProperty("The end time of DAG scheduling.")
private long endTime;

@JsonProperty("boundary_type")
@ApiModelProperty("The offline task boundary type.")
private String boundaryType;

@JsonProperty("cron_expr")
@ApiModelProperty("Cron expression.")
private String cronExpr;

@JsonProperty("seconds_interval")
@ApiModelProperty("Time interval (in seconds).")
private String secondsInterval;

@JsonProperty("connection_id")
@ApiModelProperty("Airflow Connection Id of Inlong Manager.")
private String connectionId;

@JsonProperty("timezone")
@ApiModelProperty("The timezone.")
private String timezone;
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.pojo.schedule.airflow;

import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
import com.fasterxml.jackson.annotation.JsonProperty;
import io.swagger.annotations.ApiModel;
import io.swagger.annotations.ApiModelProperty;
import lombok.Data;

import java.math.BigDecimal;

@Data
@JsonIgnoreProperties(ignoreUnknown = true)
@ApiModel(description = "[RFC7807](https://tools.ietf.org/html/rfc7807) compliant response. ")
public class Error {

@JsonProperty("detail")
@ApiModelProperty("Error Details.")
private String detail;

@JsonProperty("instance")
@ApiModelProperty("Error of the instance.")
private String instance;

@JsonProperty("status")
@ApiModelProperty("Error of the status.")
private BigDecimal status;

@JsonProperty("title")
@ApiModelProperty("Error of the title.")
private String title;

@JsonProperty("type")
@ApiModelProperty("Error of the type.")
private String type;
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ public enum ScheduleEngineType {

NONE("None"),
QUARTZ("Quartz"),
AIRFLOW("Airflow"),
DOLPHINSCHEDULER("DolphinScheduler");

private final String type;
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more
* contributor license agreements. See the NOTICE file distributed with
* this work for additional information regarding copyright ownership.
* The ASF licenses this file to You under the Apache License, Version 2.0
* (the "License"); you may not use this file except in compliance with
* the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.inlong.manager.schedule.airflow;

/**
* Contains constants for interacting with the Airflow API.
*/
public class AirFlowAPIConstant {

public static final String DEFAULT_TIMEZONE = "Asia/Shanghai";
public static final String INLONG_OFFLINE_DAG_TASK_PREFIX = "inlong_offline_task_";
public static final String SUBMIT_OFFLINE_JOB_URI = "/inlong/manager/api/group/submitOfflineJob";

// AirflowConnection
public static final String LIST_CONNECTIONS_URI = "/api/v1/connections";
public static final String GET_CONNECTION_URI = "/api/v1/connections/{connection_id}";

// DAG
public static final String LIST_DAGS_URI = "/api/v1/dags";
public static final String UPDATE_DAG_URI = "/api/v1/dags/{dag_id}";

// DAGRun
public static final String TRIGGER_NEW_DAG_RUN_URI = "/api/v1/dags/{dag_id}/dagRuns";

}
Loading

0 comments on commit befe172

Please sign in to comment.