Skip to content

Commit

Permalink
BE fix job submit logic, fix attr, update test, FE fix job sumit v-mo…
Browse files Browse the repository at this point in the history
…del, attr
  • Loading branch information
yennanliu committed Jan 16, 2024
1 parent deb9dae commit 1f55f58
Show file tree
Hide file tree
Showing 5 changed files with 36 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,10 @@
import com.google.gson.FieldNamingPolicy;
import com.google.gson.Gson;
import com.google.gson.GsonBuilder;
import com.yen.FlinkRestService.Repository.JobJarRepository;
import com.yen.FlinkRestService.Repository.JobRepository;
import com.yen.FlinkRestService.model.Job;
import com.yen.FlinkRestService.model.JobJar;
import com.yen.FlinkRestService.model.dto.job.JobSubmitDto;
import com.yen.FlinkRestService.model.dto.job.JobUpdateDto;
import com.yen.FlinkRestService.model.response.JobOverview;
Expand All @@ -22,6 +24,7 @@
import org.springframework.web.client.RestTemplate;

import java.util.List;
import java.util.Optional;

@Slf4j
@Service
Expand All @@ -30,6 +33,9 @@ public class JobService {
@Autowired
JobRepository jobRepository;

@Autowired
JobJarRepository jobJarRepository;

@Autowired
private RestTemplateService restTemplateService;

Expand Down Expand Up @@ -64,7 +70,11 @@ public void addJob(JobSubmitDto jobSubmitDto) {
*/
String baseUrl = BASE_URL + "/jars/";
// TODO : fix below to send entry-class, parallelism to flink
String url = baseUrl + jobSubmitDto.getJarId() + "/run"; // + "entry-class=" + jobSubmitDto.getEntryClass();
if (!jobJarRepository.findById(jobSubmitDto.getJarId()).isPresent()){
throw new RuntimeException("Job jar NOT exists, Jar ID = " + jobSubmitDto.getJarId());
}
JobJar jobJar = jobJarRepository.findById(jobSubmitDto.getJarId()).get();
String url = baseUrl + jobJar.getSavedJarName() + "/run"; // + "entry-class=" + jobSubmitDto.getEntryClass();
System.out.println("url = " + url);

// Set request body
Expand All @@ -82,7 +92,7 @@ public void addJob(JobSubmitDto jobSubmitDto) {
// save to DB
Job job = new Job();
job.setJobId(jobSubmitResponse.getJobid());
job.setName(jobSubmitDto.getJarId());
job.setName(jobJar.getId() + "-" + jobJar.getSavedJarName());
job.setStartTime( System.currentTimeMillis()); // TODO : double check
jobRepository.save(job);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
@ToString
public class JobSubmitDto {

private String jarId; // 927a9fac-c7bf-48cd-b1b8-b4e536449eb0_StreamSQLExample.jar
private Integer jarId;
private String entryClass; // "org.apache.flink.table.examples.java.basics.StreamSQLExample"
private Integer parallelism = 1; // default as 1
private String programArgs;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ public class JarUtil {
public String getJarNameFromRepsonse(JarUploadResponse jarUploadResponse){
try{
String fileName = jarUploadResponse.getFilename();
String[] list = fileName.split("/");
String[] list = fileName.split("/flink-web-upload");
return list[list.length-1];
}catch (Exception e){
e.printStackTrace();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,24 @@ class JarUtilTest {
@Test
public void TestGetJarName(){

/**
* jobSubmitDto = JobSubmitDto(
* jarId=/var/folders/51/3j6n8rwd74g2tsjt5jhcy91w0000gn/T/flink-web-2837802e-a6d4-46f8-bc36-5f695312ec9c/flink-web-upload/ac26d385-88b2-4d0f-9697-b743d3a105dc_SessionWindowing.jar,
* entryClass=string,
* parallelism=0,
* programArgs=string,
* savePointPath=string,
* allowNonRestoredState=true
* )
*
*/
JarUtil jarUtil = new JarUtil();
JarUploadResponse jarUploadResponse = new JarUploadResponse();
jarUploadResponse.setFilename("/var/folders/tz/5r4lbzxj5hs5q87gwdwyjnph0000gn/T/flink-web-2f052123-bb4e-4d85-80f7-44f673625f02/flink-web-upload/31c74f38-9278-4c72-ab8a-c1bb2e47004c_TopSpeedWindowing.jar");

System.out.println(jarUploadResponse.toString());
System.out.println("jarUploadResponse = " + jarUploadResponse);

System.out.println(jarUtil.getJarNameFromRepsonse(jarUploadResponse));
System.out.println("response jar id = " + jarUtil.getJarNameFromRepsonse(jarUploadResponse));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,13 +12,9 @@
<form>
<div class="form-group">
<label>Jar ID</label>
<select class="form-control" v-model="savedJarName" required>
<option
v-for="jar in jars"
:key="jar.id"
:value="jar.savedJarName"
>
Name : {{ jar.savedJarName }}
<select class="form-control" v-model="savedJarId" required>
<option v-for="jar in jars" :key="jar.id" :value="jar.id">
Name : {{ jar.id + " " + jar.savedJarName }}
</option>
</select>
</div>
Expand All @@ -32,8 +28,8 @@
</div>
</div>
</template>
<script>

<script>
import swal from "sweetalert";
import axios from "axios";
Expand All @@ -44,7 +40,7 @@ export default {
allowNonRestoredState: null,
entryClass: null,
jarId: null,
savedJarName: null,
savedJarId: null,
parallelism: null,
programArgs: null,
savePointPath: null,
Expand All @@ -59,7 +55,6 @@ export default {
.get("http://localhost:9999/" + "jar/")
.then((res) => {
this.jars = res.data;
console.log(">>> (getJars) this.jars = " + JSON.stringify(this.jars));
})
.catch((err) => console.log(err));
},
Expand All @@ -68,7 +63,7 @@ export default {
const newJob = {
allowNonRestoredState: null,
entryClass: null,
jarId: this.savedJarName,
jarId: this.savedJarId,
parallelism: 1,
programArgs: null,
savePointPath: null,
Expand Down Expand Up @@ -102,12 +97,11 @@ export default {
},
};
</script>
<style scoped>

<style scoped>
h4 {
font-family: "Roboto", sans-serif;
color: #484848;
font-weight: 700;
}
</style>

0 comments on commit 1f55f58

Please sign in to comment.