Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add test for udf script output #137

Merged
merged 5 commits into from
Jun 20, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ markers = [
"configuration: tests related to pyexasol settings and configuration.",
"edge_cases: tests related to pyexasol and exasol edge cases scenarios.",
"tls: tests related to tls.",
"udf: tests related to user defined functions.",
"misc: miscellaneous tests which did not fit in the other categories."
]

78 changes: 78 additions & 0 deletions test/integration/udf_test.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
import pytest
import pyexasol
from inspect import cleandoc


@pytest.fixture
def logging_address():
# 172.17.0.1 is an IP address of docker host in Linux
yield ("172.17.0.1", 8580)


@pytest.fixture
def connection(dsn, user, password, schema, logging_address):
_, port = logging_address
con = pyexasol.connect(
dsn=dsn,
user=user,
password=password,
schema=schema,
udf_output_bind_address=("", port),
udf_output_connect_address=logging_address,
)

yield con

con.close()


@pytest.fixture
def echo(connection, logging_address):
name = "ECHO"
ip, port = logging_address
udf = cleandoc(
f"""
--/
CREATE OR REPLACE LUA SCALAR SCRIPT {name}(text VARCHAR(2000))
RETURNS VARCHAR(2000) AS

function run(ctx)
local socket = require("socket")
local tcp_socket = socket.tcp()
local ok, err = tcp_socket:connect("{ip}", {port})
tcp_socket:send(ctx.text)
tcp_socket:shutdown()
return ctx.text
end
/
"""
)
connection.execute(udf)

def executor(text):
udf_call = f"SELECT {name}('{text}');"
stmt, logfiles = connection.execute_udf_output(udf_call)
result = stmt.fetchval()
log = "LOG: " + "".join((log.read_text() for log in logfiles))
return result, log

yield executor

drop = f"DROP SCRIPT IF EXISTS {name};"
connection.execute(drop)


@pytest.mark.udf
def test_udf_output(connection, echo):
text = "Some text which should be echoed"

result, log = echo(text)

expected_result = text
expected_log = f"LOG: {text}"

actual_result = result
actual_log = log

assert actual_result == expected_result
assert actual_log == expected_log
Loading