Skip to content
This repository has been archived by the owner on Nov 19, 2024. It is now read-only.

Don't work with pandas udf #6

Open
amoyrand opened this issue Mar 30, 2021 · 8 comments
Open

Don't work with pandas udf #6

amoyrand opened this issue Mar 30, 2021 · 8 comments

Comments

@amoyrand
Copy link

Hello
I'm trying to replicate your example in my own project.
But I have an issue with python udf: always run into this error ModuleNotFoundError: No module named 'pipelines'

I simply changed your code as is:

amazon.py:

# Example ETL with no parameters - see etl() function

from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, col, current_timestamp
from pipelines.utils import transformations, configmanagement as cm
from pyspark.sql.types import StructType, StructField, DoubleType, StringType

spark = SparkSession.builder.getOrCreate()


def extract_Amazon(filePath):
    return spark.read.format("parquet").load(filePath)

def transform_Amazon(df):
    df = df.withColumn("meta_timestamp", lit(current_timestamp()))
    df = transformations.addDummyColumn(df)
    return df

def load_Amazon(df):
  spark.sql("DROP TABLE IF EXISTS amazon")
  df.write.format("parquet").mode("overwrite").saveAsTable("amazon")
  return

def addone(df):
    df['price'] = df['price'] + 1
    return df

def etl():
  df = extract_Amazon("/databricks-datasets/amazon/test4K")
  df = transform_Amazon(df)

  schema = StructType([
        StructField("brand", StringType(), True),
        StructField("price", DoubleType(), True),
    ])

  df = df.select('brand', 'price').groupBy("brand").applyInPandas(transformations.addone, schema)
  print(df.show())
  # load_Amazon(df)

and it gives me this error:

pyspark.sql.utils.PythonException: An exception was thrown from a UDF: 'pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 165, in _read_with_length
    return self.loads(obj)
  File "/databricks/spark/python/pyspark/serializers.py", line 469, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'pipelines''. Full traceback below:
Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 165, in _read_with_length
    return self.loads(obj)
  File "/databricks/spark/python/pyspark/serializers.py", line 469, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'pipelines'

During handling of the above exception, another exception occurred:

pyspark.serializers.SerializationError: Caused by Traceback (most recent call last):
  File "/databricks/spark/python/pyspark/serializers.py", line 165, in _read_with_length
    return self.loads(obj)
  File "/databricks/spark/python/pyspark/serializers.py", line 469, in loads
    return pickle.loads(obj, encoding=encoding)
ModuleNotFoundError: No module named 'pipelines'

Any idea how to solve this ?

@simondmorias
Copy link
Member

the pipelines folder is not in your path. See main.py, basically it can't find the module.

@amoyrand
Copy link
Author

in the main, I have the lines:

  dirname = os.path.abspath(os.path.dirname(__file__))
   sys.path.insert(0, (os.path.join(dirname, 'pipelines')))

But still get the ModuleNotFoundError: No module named 'pipelines' error...

@simondmorias
Copy link
Member

simondmorias commented Apr 1, 2021 via email

@amoyrand
Copy link
Author

amoyrand commented Apr 8, 2021

Hello @simondmorias .
I finally had this working. thanks for you tips.

I'm now facing another problem:

I'm using sedona with databricks.
when running my code in a notebook everything goes well (I installed the thrid party jars on the cluster)

but when running with databricks-connect, i'm getting a TypeError: 'JavaPackage' object is not callable when running;

spark = SparkSession. \
              builder. \
              appName('appName'). \
              config("spark.serializer", KryoSerializer.getName). \
              config("spark.kryo.registrator", SedonaKryoRegistrator.getName). \
              config('spark.jars.packages',
                     'org.apache.sedona:sedona-python-adapter-3.0_2.12:1.0.0-incubating,'
                     'org.datasyslab:geotools-wrapper:geotools-24.0'). \
              getOrCreate()

SedonaRegistrator.registerAll(spark)

I guess that the jars are not well imported in local.

Did you ever experienced this? do you know how to import local jars with databricks-connect ?

Thank you

@simondmorias
Copy link
Member

On your local machine run databricks-connect get-jar-dir - add the jars there.

@amoyrand
Copy link
Author

amoyrand commented Apr 12, 2021

Hello.
I got the registerAll working but then have another issue with databricks connect:

from pyspark.sql import SparkSession
from sedona.register import SedonaRegistrator
from sedona.utils import KryoSerializer, SedonaKryoRegistrator

sparkSession = SparkSession. \
    builder. \
    master("local[*]").\
    appName('appName'). \
    config("spark.serializer", KryoSerializer.getName). \
    config("spark.kryo.registrator", SedonaKryoRegistrator.getName). \
    config('spark.jars.packages',
           'org.apache.sedona:sedona-python-adapter-3.0_2.12:1.0.0-incubating,'
           'org.datasyslab:geotools-wrapper:geotools-24.0'). \
    getOrCreate()


SedonaRegistrator.registerAll(sparkSession)

print(sparkSession.sql('describe function st_point').show())

print(sparkSession.sql("SELECT st_point(41.40338, 2.17403) AS geometry").show())

here I can describe the UDF st_point but when trying to use it, it fails with:

Undefined function: 'st_point'. This function is neither a registered temporary function nor a permanent function registered in the database 'default'.; line 1 pos 0

full log here: https://filebin.net/yzy0tn58myzso8l4/log.txt?t=cntz46u4

Any idea what happens here ?

Thanks a lot for your help

@simondmorias
Copy link
Member

I would post on StackOverflow - that is more of a general Spark problem rather than this container.

@VellStef
Copy link

@amoyrand how did you solve the original problem where pipelines were not detected ?

Sign up for free to subscribe to this conversation on GitHub. Already have an account? Sign in.
Labels
None yet
Projects
None yet
Development

No branches or pull requests

3 participants