Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Detect direct access to cloud storage and raise a deprecation warning #1506

Merged
merged 12 commits into from
Apr 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
167 changes: 150 additions & 17 deletions src/databricks/labs/ucx/source_code/pyspark.py
Original file line number Diff line number Diff line change
Expand Up @@ -14,18 +14,43 @@
from databricks.labs.ucx.source_code.queries import FromTable


class AstHelper:
@staticmethod
def get_full_function_name(node):
if isinstance(node.func, ast.Attribute):
return AstHelper._get_value(node.func)

if isinstance(node.func, ast.Name):
return node.func.id

return None

@staticmethod
def _get_value(node):
if isinstance(node.value, ast.Name):
return node.value.id + '.' + node.attr

if isinstance(node.value, ast.Attribute):
return AstHelper._get_value(node.value) + '.' + node.attr

return None


@dataclass
class Matcher(ABC):
method_name: str
min_args: int
max_args: int
table_arg_index: int
table_arg_name: str | None = None
call_context: dict[str, set[str]] | None = None

def matches(self, node: ast.AST):
if not (isinstance(node, ast.Call) and isinstance(node.func, ast.Attribute)):
return False
return self._get_table_arg(node) is not None
return (
isinstance(node, ast.Call)
and isinstance(node.func, ast.Attribute)
and self._get_table_arg(node) is not None
)

@abstractmethod
def lint(self, from_table: FromTable, index: MigrationIndex, node: ast.Call) -> Iterator[Advice]:
Expand All @@ -39,9 +64,26 @@ def _get_table_arg(self, node: ast.Call):
if len(node.args) > 0:
return node.args[self.table_arg_index] if self.min_args <= len(node.args) <= self.max_args else None
assert self.table_arg_name is not None
if not node.keywords:
return None
arg = next(kw for kw in node.keywords if kw.arg == self.table_arg_name)
return arg.value if arg is not None else None

def _check_call_context(self, node: ast.Call) -> bool:
assert isinstance(node.func, ast.Attribute) # Avoid linter warning
func_name = node.func.attr
qualified_name = AstHelper.get_full_function_name(node)

# Check if the call_context is None as that means all calls are checked
if self.call_context is None:
return True

# Get the qualified names from the call_context dictionary
qualified_names = self.call_context.get(func_name)

# Check if the qualified name is in the set of qualified names that are allowed
return qualified_name in qualified_names if qualified_names else False


@dataclass
class QueryMatcher(Matcher):
Expand Down Expand Up @@ -78,19 +120,8 @@ class TableNameMatcher(Matcher):

def lint(self, from_table: FromTable, index: MigrationIndex, node: ast.Call) -> Iterator[Advice]:
table_arg = self._get_table_arg(node)
if isinstance(table_arg, ast.Constant):
dst = self._find_dest(index, table_arg.value, from_table.schema)
if dst is not None:
yield Deprecation(
code='table-migrate',
message=f"Table {table_arg.value} is migrated to {dst.destination()} in Unity Catalog",
# SQLGlot does not propagate tokens yet. See https://github.com/tobymao/sqlglot/issues/3159
start_line=node.lineno,
start_col=node.col_offset,
end_line=node.end_lineno or 0,
end_col=node.end_col_offset or 0,
)
else:

if not isinstance(table_arg, ast.Constant):
assert isinstance(node.func, ast.Attribute) # always true, avoids a pylint warning
yield Advisory(
code='table-migrate',
Expand All @@ -100,6 +131,21 @@ def lint(self, from_table: FromTable, index: MigrationIndex, node: ast.Call) ->
end_line=node.end_lineno or 0,
end_col=node.end_col_offset or 0,
)
return

dst = self._find_dest(index, table_arg.value, from_table.schema)
if dst is None:
return

yield Deprecation(
code='table-migrate',
message=f"Table {table_arg.value} is migrated to {dst.destination()} in Unity Catalog",
# SQLGlot does not propagate tokens yet. See https://github.com/tobymao/sqlglot/issues/3159
start_line=node.lineno,
start_col=node.col_offset,
end_line=node.end_lineno or 0,
end_col=node.end_col_offset or 0,
)

def apply(self, from_table: FromTable, index: MigrationIndex, node: ast.Call) -> None:
table_arg = self._get_table_arg(node)
Expand Down Expand Up @@ -135,7 +181,62 @@ def lint(self, from_table: FromTable, index: MigrationIndex, node: ast.Call) ->
)

def apply(self, from_table: FromTable, index: MigrationIndex, node: ast.Call) -> None:
raise NotImplementedError("Should never get there!")
# No transformations to apply
return


@dataclass
class DirectFilesystemAccessMatcher(Matcher):
_DIRECT_FS_REFS = {
"s3a://",
"s3n://",
"s3://",
"wasb://",
"wasbs://",
"abfs://",
"abfss://",
"dbfs:/",
"hdfs://",
"file:/",
}

def matches(self, node: ast.AST):
return (
isinstance(node, ast.Call)
and isinstance(node.func, ast.Attribute)
and self._get_table_arg(node) is not None
)

def lint(self, from_table: FromTable, index: MigrationIndex, node: ast.Call) -> Iterator[Advice]:
table_arg = self._get_table_arg(node)

if not isinstance(table_arg, ast.Constant):
return

if any(table_arg.value.startswith(prefix) for prefix in self._DIRECT_FS_REFS):
yield Deprecation(
code='direct-filesystem-access',
message=f"The use of direct filesystem references is deprecated: {table_arg.value}",
start_line=node.lineno,
start_col=node.col_offset,
end_line=node.end_lineno or 0,
end_col=node.end_col_offset or 0,
)
return

if table_arg.value.startswith("/") and self._check_call_context(node):
yield Deprecation(
code='direct-filesystem-access',
message=f"The use of default dbfs: references is deprecated: {table_arg.value}",
start_line=node.lineno,
start_col=node.col_offset,
end_line=node.end_lineno or 0,
end_col=node.end_col_offset or 0,
)

def apply(self, from_table: FromTable, index: MigrationIndex, node: ast.Call) -> None:
# No transformations to apply
return


class SparkMatchers:
Expand Down Expand Up @@ -193,6 +294,37 @@ def __init__(self):
TableNameMatcher("register", 1, 2, 0, "name"),
]

direct_fs_access_matchers = [
DirectFilesystemAccessMatcher("ls", 1, 1, 0, call_context={"ls": {"dbutils.fs.ls"}}),
DirectFilesystemAccessMatcher("cp", 1, 2, 0, call_context={"cp": {"dbutils.fs.cp"}}),
DirectFilesystemAccessMatcher("rm", 1, 1, 0, call_context={"rm": {"dbutils.fs.rm"}}),
DirectFilesystemAccessMatcher("head", 1, 1, 0, call_context={"head": {"dbutils.fs.head"}}),
DirectFilesystemAccessMatcher("put", 1, 2, 0, call_context={"put": {"dbutils.fs.put"}}),
DirectFilesystemAccessMatcher("mkdirs", 1, 1, 0, call_context={"mkdirs": {"dbutils.fs.mkdirs"}}),
DirectFilesystemAccessMatcher("mv", 1, 2, 0, call_context={"mv": {"dbutils.fs.mv"}}),
DirectFilesystemAccessMatcher("text", 1, 3, 0),
DirectFilesystemAccessMatcher("csv", 1, 1000, 0),
DirectFilesystemAccessMatcher("json", 1, 1000, 0),
DirectFilesystemAccessMatcher("orc", 1, 1000, 0),
DirectFilesystemAccessMatcher("parquet", 1, 1000, 0),
DirectFilesystemAccessMatcher("save", 0, 1000, -1, "path"),
DirectFilesystemAccessMatcher("load", 0, 1000, -1, "path"),
DirectFilesystemAccessMatcher("option", 1, 1000, 1), # Only .option("path", "xxx://bucket/path") will hit
DirectFilesystemAccessMatcher("addFile", 1, 3, 0),
DirectFilesystemAccessMatcher("binaryFiles", 1, 2, 0),
DirectFilesystemAccessMatcher("binaryRecords", 1, 2, 0),
DirectFilesystemAccessMatcher("dump_profiles", 1, 1, 0),
DirectFilesystemAccessMatcher("hadoopFile", 1, 8, 0),
DirectFilesystemAccessMatcher("newAPIHadoopFile", 1, 8, 0),
DirectFilesystemAccessMatcher("pickleFile", 1, 3, 0),
DirectFilesystemAccessMatcher("saveAsHadoopFile", 1, 8, 0),
DirectFilesystemAccessMatcher("saveAsNewAPIHadoopFile", 1, 7, 0),
DirectFilesystemAccessMatcher("saveAsPickleFile", 1, 2, 0),
DirectFilesystemAccessMatcher("saveAsSequenceFile", 1, 2, 0),
DirectFilesystemAccessMatcher("saveAsTextFile", 1, 2, 0),
DirectFilesystemAccessMatcher("load_from_path", 1, 1, 0),
]

# nothing to migrate in UserDefinedFunction, see https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.UserDefinedFunction.html
# nothing to migrate in UserDefinedTableFunction, see https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.UserDefinedTableFunction.html
self._matchers = {}
Expand All @@ -203,6 +335,7 @@ def __init__(self):
+ spark_dataframereader_matchers
+ spark_dataframewriter_matchers
+ spark_udtfregistration_matchers
+ direct_fs_access_matchers
):
self._matchers[matcher.method_name] = matcher

Expand Down
40 changes: 40 additions & 0 deletions tests/unit/source_code/test_notebook_linter.py
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@
end_line=4,
end_col=1024,
),
Deprecation(
code='direct-filesystem-access',
message='The use of default dbfs: references is deprecated: ' '/mnt/things/e/f/g',
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

start_line=14,
start_col=8,
end_line=14,
end_col=43,
),
Deprecation(
code='dbfs-usage',
message='Deprecated file system path in call to: /mnt/things/e/f/g',
Expand Down Expand Up @@ -82,6 +90,14 @@

""",
[
Deprecation(
code='direct-filesystem-access',
message='The use of default dbfs: references is deprecated: ' '/mnt/things/e/f/g',
start_line=5,
start_col=8,
end_line=5,
end_col=43,
),
Deprecation(
code='dbfs-usage',
message='Deprecated file system path in call to: /mnt/things/e/f/g',
Expand Down Expand Up @@ -154,6 +170,30 @@
MERGE INTO delta.`/dbfs/...` t USING source ON t.key = source.key WHEN MATCHED THEN DELETE
""",
[
Deprecation(
code='direct-filesystem-access',
message='The use of default dbfs: references is deprecated: /mnt/foo/bar',
start_line=15,
start_col=0,
end_line=15,
end_col=34,
),
Deprecation(
code='direct-filesystem-access',
message='The use of direct filesystem references is deprecated: dbfs:/mnt/foo/bar',
start_line=16,
start_col=0,
end_line=16,
end_col=39,
),
Deprecation(
code='direct-filesystem-access',
message='The use of direct filesystem references is deprecated: dbfs://mnt/foo/bar',
start_line=17,
start_col=0,
end_line=17,
end_col=40,
),
Advisory(
code='dbfs-usage',
message='Possible deprecated file system path: dbfs:/...',
Expand Down
Loading
Loading