Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Added delete button for Variables, Pools, Connections and Dag Runs #114

Open
wants to merge 5 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion astronomer_starship/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "2.1.0"
__version__ = "2.2.0"


def get_provider_info():
Expand Down
60 changes: 54 additions & 6 deletions astronomer_starship/compat/starship_compatability.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import json
import logging
import os
from flask import jsonify
from flask import jsonify, Response
from sqlalchemy.orm import Session
from typing import TYPE_CHECKING

Expand All @@ -12,6 +13,9 @@
import pytz


logger = logging.getLogger(__name__)


def get_from_request(args, json, key, required: bool = False) -> "Any":
val = json.get(key, args.get(key))
if val is None and required:
Expand Down Expand Up @@ -146,6 +150,24 @@ def generic_set_one(session: Session, qualname: str, attrs: dict, **kwargs):
raise e


def generic_delete(session: Session, qualname: str, **kwargs):
from http import HTTPStatus
from sqlalchemy import delete

(_, thing_cls) = import_from_qualname(qualname)

try:
filters = [getattr(thing_cls, attr) == val for attr, val in kwargs.items()]
deleted_rows = session.execute(delete(thing_cls).where(*filters)).rowcount
session.commit()
logger.info(f"Deleted {deleted_rows} rows for table {qualname}")
return Response(None, status=HTTPStatus.NO_CONTENT)
except Exception as e:
logger.error(f"Error deleting rows for table {qualname}: {e}")
session.rollback()
raise e


def get_test_data(attrs: dict, method: "Union[str, None]" = None) -> "Dict[str, Any]":
"""
>>> get_test_data(method="POST", attrs={"key": {"attr": "key", "methods": [("POST", True)], "test_value": "key"}})
Expand Down Expand Up @@ -195,10 +217,21 @@ def set_env_vars(cls):
res.status_code = 409
raise NotImplementedError()

@classmethod
def delete_env_vars(cls):
"""This is not possible to do via API, so return an error"""
res = jsonify({"error": "Not implemented"})
res.status_code = 405
raise NotImplementedError()

@classmethod
def variable_attrs(cls) -> "Dict[str, AttrDesc]":
return {
"key": {"attr": "key", "methods": [("POST", True)], "test_value": "key"},
"key": {
"attr": "key",
"methods": [("POST", True), ("DELETE", True)],
"test_value": "key",
},
"val": {"attr": "val", "methods": [("POST", True)], "test_value": "val"},
"description": {
"attr": "description",
Expand All @@ -217,12 +250,15 @@ def set_variable(self, **kwargs):
self.session, "airflow.models.Variable", self.variable_attrs(), **kwargs
)

def delete_variable(self, **kwargs):
return generic_delete(self.session, "airflow.models.Variable", **kwargs)

@classmethod
def pool_attrs(cls) -> "Dict[str, AttrDesc]":
return {
"name": {
"attr": "pool",
"methods": [("POST", True)],
"methods": [("POST", True), ("DELETE", True)],
"test_value": "test_name",
},
"slots": {"attr": "slots", "methods": [("POST", True)], "test_value": 1},
Expand All @@ -241,12 +277,15 @@ def set_pool(self, **kwargs):
self.session, "airflow.models.Pool", self.pool_attrs(), **kwargs
)

def delete_pool(self, **kwargs):
return generic_delete(self.session, "airflow.models.Pool", **kwargs)

@classmethod
def connection_attrs(cls) -> "Dict[str, AttrDesc]":
return {
"conn_id": {
"attr": "conn_id",
"methods": [("POST", True)],
"methods": [("POST", True), ("DELETE", True)],
"test_value": "conn_id",
},
"conn_type": {
Expand Down Expand Up @@ -301,6 +340,9 @@ def set_connection(self, **kwargs):
self.session, "airflow.models.Connection", self.connection_attrs(), **kwargs
)

def delete_connection(self, **kwargs):
return generic_delete(self.session, "airflow.models.Connection", **kwargs)

@classmethod
def dag_attrs(cls) -> "Dict[str, AttrDesc]":
return {
Expand Down Expand Up @@ -442,7 +484,7 @@ def dag_runs_attrs(cls) -> "Dict[str, AttrDesc]":
return {
"dag_id": {
"attr": "dag_id",
"methods": [("GET", True)],
"methods": [("GET", True), ("DELETE", True)],
"test_value": "dag_0",
},
# Limit is the number of rows to return.
Expand Down Expand Up @@ -591,6 +633,9 @@ def set_dag_runs(self, dag_runs: list):
dag_runs = self.insert_directly("dag_run", dag_runs)
return {"dag_runs": dag_runs, "dag_run_count": self._get_dag_run_count(dag_id)}

def delete_dag_runs(self, **kwargs):
return generic_delete(self.session, "airflow.models.DagRun", **kwargs)

@classmethod
def task_instances_attrs(cls) -> "Dict[str, AttrDesc]":
epoch = datetime.datetime(1970, 1, 1, 0, 0)
Expand All @@ -600,7 +645,7 @@ def task_instances_attrs(cls) -> "Dict[str, AttrDesc]":
return {
"dag_id": {
"attr": "dag_id",
"methods": [("GET", True)],
"methods": [("GET", True), ("DELETE", True)],
"test_value": "dag_0",
},
# Limit is the number of rows to return.
Expand Down Expand Up @@ -853,6 +898,9 @@ def set_task_instances(self, task_instances: list):
task_instances = self.insert_directly("task_instance", task_instances)
return {"task_instances": task_instances}

def delete_task_instances(self, **kwargs):
return generic_delete(self.session, "airflow.models.TaskInstance", **kwargs)

def insert_directly(self, table_name, items):
from sqlalchemy.exc import InvalidRequestError
from sqlalchemy import MetaData
Expand Down
33 changes: 24 additions & 9 deletions astronomer_starship/src/component/MigrateButton.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,16 @@
import React, { useState } from 'react';
import axios from 'axios';
import { Button, useToast } from '@chakra-ui/react';
import { MdErrorOutline } from 'react-icons/md';
import { FaCheck } from 'react-icons/fa';
import { MdErrorOutline, MdDeleteForever } from 'react-icons/md';
import { GoUpload } from 'react-icons/go';
import PropTypes from 'prop-types';

function checkStatus(status, exists) {
if (status === 204)
return false;
return status === 200 || exists;
}

export default function MigrateButton({
route, headers, existsInRemote, sendData, isDisabled,
}) {
Expand All @@ -16,13 +21,23 @@ export default function MigrateButton({
const [exists, setExists] = useState(existsInRemote);
function handleClick() {
setLoading(true);
axios.post(route, sendData, { headers })
axios({
method: exists ? 'delete' : 'post',
url: route,
headers,
params: sendData,
})
.then((res) => {
setLoading(false);
setExists(res.status === 200);
setExists(checkStatus(res.status, exists));
toast({
title: 'Success',
status: 'success',
isClosable: true,
})
})
.catch((err) => {
setExists(false);
setExists(exists);
setLoading(false);
toast({
title: err.response?.data?.error || err.response?.data || err.message,
Expand All @@ -34,19 +49,19 @@ export default function MigrateButton({
}
return (
<Button
isDisabled={loading || isDisabled || exists}
isDisabled={loading || isDisabled}
isLoading={loading}
loadingText="Loading"
variant="solid"
leftIcon={(
error ? <MdErrorOutline /> : exists ? <FaCheck /> : !loading ? <GoUpload /> : <span />
error ? <MdErrorOutline /> : exists ? <MdDeleteForever /> : !loading ? <GoUpload /> : <span />
)}
colorScheme={
exists ? 'green' : loading ? 'teal' : error ? 'red' : 'teal'
exists ? 'red' : loading ? 'teal' : error ? 'red' : 'teal'
}
onClick={() => handleClick()}
>
{exists ? 'Ok' : loading ? '' : error ? 'Error!' : 'Migrate'}
{exists ? 'Delete' : loading ? '' : error ? 'Error!' : 'Migrate'}
</Button>
);
}
Expand Down
52 changes: 44 additions & 8 deletions astronomer_starship/src/pages/DAGHistoryPage.jsx
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,8 @@ import {
} from '@chakra-ui/react';
import PropTypes from 'prop-types';
import axios from 'axios';
import { MdErrorOutline } from 'react-icons/md';
import { MdErrorOutline, MdDeleteForever } from 'react-icons/md';
import { GrDocumentMissing } from 'react-icons/gr';
import { FaCheck } from 'react-icons/fa';
import { GoUpload } from 'react-icons/go';
import humanFormat from 'human-format';
import { ExternalLinkIcon, RepeatIcon } from '@chakra-ui/icons';
Expand Down Expand Up @@ -56,6 +55,44 @@ function DAGHistoryMigrateButton({
const percent = 100;

function handleClick() {

function deleteRuns() {
setLoadPerc(percent * 0.5);
axios({
method: 'delete',
url: proxyUrl(url + constants.DAG_RUNS_ROUTE),
headers: proxyHeaders(token),
params: { dag_id: dagId },
}).then((res) => {
setExists(!(res.status === 204));
dispatch({
type: 'set-dags-data',
dagsData: {
[dagId]: {
remote: {
dag_run_count: 0,
},
},
},
});
setLoadPerc(percent * 1);
setLoadPerc(0);
}).catch((err) => {
setExists(false);
setLoadPerc(percent * 0);
toast({
title: err.response?.data?.error || err.response?.data || err.message,
status: 'error',
isClosable: true,
});
setError(err);
});
}

if (exists) {
deleteRuns();
return;
}
const errFn = (err) => {
setExists(false);
// noinspection PointlessArithmeticExpressionJS
Expand Down Expand Up @@ -117,23 +154,23 @@ function DAGHistoryMigrateButton({
return (
<WithTooltip isDisabled={isDisabled}>
<Button
isDisabled={isDisabled || loadPerc || exists}
isDisabled={isDisabled || loadPerc}
// isLoading={loading}
// loadingText="Loading"
variant="solid"
leftIcon={(
error ? <MdErrorOutline />
: exists ? <FaCheck />
: exists ? <MdDeleteForever />
: isDisabled ? <GrDocumentMissing />
: !loadPerc ? <GoUpload />
: <span />
)}
colorScheme={
exists ? 'green' : isDisabled ? 'gray' : error ? 'red' : 'teal'
exists ? 'red' : isDisabled ? 'gray' : error ? 'red' : 'teal'
}
onClick={() => handleClick()}
>
{exists ? 'Ok'
{exists ? 'Delete'
: loadPerc ? (
<CircularProgress thickness="20px" size="30px" value={loadPerc} />
)
Expand Down Expand Up @@ -326,8 +363,7 @@ export default function DAGHistoryPage({ state, dispatch }) {
isDisabled={
!info.row.original.remote?.dag_id ? 'DAG not found in remote'
: !info.row.original.local.dag_run_count ? 'No DAG Runs to migrate'
: info.row.original.remote?.dag_run_count ? 'DAG Runs already exist in remote'
: false
: false
}
dispatch={dispatch}
/>
Expand Down
2 changes: 1 addition & 1 deletion astronomer_starship/starship.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@
from airflow.security import permissions
from airflow.www import auth

ALLOWED_PROXY_METHODS = ["GET", "POST", "PATCH"]
ALLOWED_PROXY_METHODS = ["GET", "POST", "PATCH", "DELETE"]


class Starship(BaseView):
Expand Down
Loading
Loading