Skip to content

Commit

Permalink
add test
Browse files Browse the repository at this point in the history
  • Loading branch information
yennanliu committed Jan 18, 2024
1 parent c6049bd commit ded1b14
Showing 1 changed file with 108 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,108 @@
package com.yen.FlinkRestService.Service;

import org.apache.commons.lang3.StringUtils;
import org.apache.zeppelin.client.ClientConfig;
import org.apache.zeppelin.client.ExecuteResult;
import org.apache.zeppelin.client.ZSession;
import org.apache.zeppelin.client.ZeppelinClient;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.HashMap;
import java.util.Map;

import static org.junit.jupiter.api.Assertions.*;

class ZeppelinServiceTest {

// @Autowired
// private ZeppelinClient zeppelinClient;

private ZSession session = null;

/** Zeppelin test */
@Test
public void zSessionSparkTest(){

try {
ClientConfig clientConfig = new ClientConfig("http://localhost:8082");
Map<String, String> intpProperties = new HashMap<>();
intpProperties.put("spark.master", "local[*]");

session = ZSession.builder()
.setClientConfig(clientConfig)
.setInterpreter("spark")
.setIntpProperties(intpProperties)
.build();

session.start();
System.out.println("Spark Web UI: " + session.getWeburl());

// scala (single result)
ExecuteResult result = session.execute("println(sc.version)");
System.out.println("Spark Version: " + result.getResults().get(0).getData());

// scala (multiple result)
result = session.execute("println(sc.version)\n" +
"val df = spark.createDataFrame(Seq((1,\"a\"), (2,\"b\")))\n" +
"z.show(df)");

// The first result is text output
System.out.println("Result 1: type: " + result.getResults().get(0).getType() +
", data: " + result.getResults().get(0).getData() );
// The second result is table output
System.out.println("Result 2: type: " + result.getResults().get(1).getType() +
", data: " + result.getResults().get(1).getData() );
System.out.println("Spark Job Urls:\n" + StringUtils.join(result.getJobUrls(), "\n"));

// error output
result = session.execute("1/0");
System.out.println("Result status: " + result.getStatus() +
", data: " + result.getResults().get(0).getData());

// pyspark
result = session.execute("pyspark", "df = spark.createDataFrame([(1,'a'),(2,'b')])\n" +
"df.registerTempTable('df')\n" +
"df.show()");
System.out.println("PySpark dataframe: " + result.getResults().get(0).getData());

// matplotlib
result = session.execute("ipyspark", "%matplotlib inline\n" +
"import matplotlib.pyplot as plt\n" +
"plt.plot([1,2,3,4])\n" +
"plt.ylabel('some numbers')\n" +
"plt.show()");
System.out.println("Matplotlib result, type: " + result.getResults().get(0).getType() +
", data: " + result.getResults().get(0).getData());

// sparkr
result = session.execute("r", "df <- as.DataFrame(faithful)\nhead(df)");
System.out.println("Sparkr dataframe: " + result.getResults().get(0).getData());

// spark sql
result = session.execute("sql", "select * from df");
System.out.println("Spark Sql dataframe: " + result.getResults().get(0).getData());

// spark invalid sql
result = session.execute("sql", "select * from unknown_table");
System.out.println("Result status: " + result.getStatus() +
", data: " + result.getResults().get(0).getData());
} catch (Exception e) {
e.printStackTrace();
} finally {
if (session != null) {
try {
session.stop();
} catch (Exception e) {
e.printStackTrace();
}
}
}
}

@Test
public void mtTest(){
System.out.println(123);
}

}

0 comments on commit ded1b14

Please sign in to comment.