Skip to content

Commit

Permalink
Fix Query JobInfo
Browse files Browse the repository at this point in the history
  • Loading branch information
liaowenlen committed Aug 30, 2024
1 parent 15a22b5 commit 75d6837
Showing 1 changed file with 55 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -24,48 +24,69 @@
import com.webank.wedatasphere.dss.apiservice.core.config.ApiServiceConfiguration;
import com.webank.wedatasphere.dss.apiservice.core.constant.ParamType;
import com.webank.wedatasphere.dss.apiservice.core.constant.RequireEnum;
import com.webank.wedatasphere.dss.apiservice.core.dao.*;
import com.webank.wedatasphere.dss.apiservice.core.dao.ApiServiceAccessDao;
import com.webank.wedatasphere.dss.apiservice.core.dao.ApiServiceDao;
import com.webank.wedatasphere.dss.apiservice.core.dao.ApiServiceParamDao;
import com.webank.wedatasphere.dss.apiservice.core.dao.ApiServiceTokenManagerDao;
import com.webank.wedatasphere.dss.apiservice.core.dao.ApiServiceVersionDao;
import com.webank.wedatasphere.dss.apiservice.core.exception.ApiServiceQueryException;
import com.webank.wedatasphere.dss.apiservice.core.exception.ApiServiceRuntimeException;
import com.webank.wedatasphere.dss.apiservice.core.execute.ApiServiceExecuteJob;
import com.webank.wedatasphere.dss.apiservice.core.execute.DefaultApiServiceJob;
import com.webank.wedatasphere.dss.apiservice.core.execute.ExecuteCodeHelper;
import com.webank.wedatasphere.dss.apiservice.core.execute.LinkisJobSubmit;
import com.webank.wedatasphere.dss.apiservice.core.jdbc.DatasourceService;
import com.webank.wedatasphere.dss.apiservice.core.service.ApiService;
import com.webank.wedatasphere.dss.apiservice.core.util.DateUtil;
import com.webank.wedatasphere.dss.apiservice.core.util.SQLCheckUtil;
import com.webank.wedatasphere.dss.apiservice.core.vo.*;
import com.webank.wedatasphere.dss.apiservice.core.exception.ApiServiceRuntimeException;
import com.webank.wedatasphere.dss.apiservice.core.service.ApiServiceQueryService;
import com.webank.wedatasphere.dss.apiservice.core.util.AssertUtil;
import com.webank.wedatasphere.dss.apiservice.core.util.DateUtil;
import com.webank.wedatasphere.dss.apiservice.core.util.ModelMapperUtil;
import com.webank.wedatasphere.dss.apiservice.core.util.SQLCheckUtil;
import com.webank.wedatasphere.dss.apiservice.core.vo.ApiAccessVo;
import com.webank.wedatasphere.dss.apiservice.core.vo.ApiServiceVo;
import com.webank.wedatasphere.dss.apiservice.core.vo.ApiVersionVo;
import com.webank.wedatasphere.dss.apiservice.core.vo.ParamVo;
import com.webank.wedatasphere.dss.apiservice.core.vo.QueryParamVo;
import com.webank.wedatasphere.dss.apiservice.core.vo.TokenManagerVo;
import org.apache.commons.collections.MapUtils;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.math3.util.Pair;
import org.apache.linkis.bml.client.BmlClient;
import org.apache.linkis.bml.client.BmlClientFactory;
import org.apache.linkis.bml.protocol.BmlDownloadResponse;
import org.apache.linkis.common.io.FsPath;
import org.apache.linkis.protocol.utils.TaskUtils;
import org.apache.linkis.server.security.SecurityFilter;
import org.apache.linkis.storage.source.FileSource;
import org.apache.linkis.ujes.client.UJESClient;
import org.apache.linkis.ujes.client.request.JobInfoAction;
import org.apache.linkis.ujes.client.response.JobExecuteResult;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.math3.util.Pair;
import org.apache.linkis.ujes.client.response.JobInfoResult;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.RowMapper;
import org.springframework.jdbc.support.JdbcUtils;
import org.springframework.stereotype.Service;
import org.springframework.util.LinkedCaseInsensitiveMap;
import org.springframework.web.context.request.RequestAttributes;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import scala.Tuple3;

import javax.annotation.PostConstruct;
import javax.servlet.http.HttpServletRequest;
import java.io.IOException;
import java.io.InputStream;
import java.sql.ResultSet;
import java.sql.ResultSetMetaData;
import java.sql.SQLException;
import java.util.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

Expand Down Expand Up @@ -460,7 +481,32 @@ protected Object getColumnValue(ResultSet rs, int index) throws SQLException {

@Override
public ApiServiceJob getJobByTaskId(String taskId){
return runJobs.get(taskId);
ApiServiceJob apiServiceJob = runJobs.get(taskId);

// 集群环境下,若未读取到缓存结果,则从linkis-jobhistory服务查询
if (null == apiServiceJob) {
RequestAttributes attributes = RequestContextHolder.getRequestAttributes();
HttpServletRequest httpServletRequest = ((ServletRequestAttributes) attributes).getRequest();
String loginUsername = SecurityFilter.getLoginUsername(httpServletRequest);

UJESClient client = LinkisJobSubmit.getClient();
JobInfoAction jobInfoAction = JobInfoAction.builder().setTaskId(taskId).setUser(loginUsername).build();
JobInfoResult jobInfoResult = (JobInfoResult) client.executeUJESJob(jobInfoAction);
Map<String, Object> taskInfo = (Map<String, Object>) jobInfoResult.getTask();

if (MapUtils.isNotEmpty(taskInfo)) {
JobExecuteResult jobExecuteResult = new JobExecuteResult();
jobExecuteResult.setTaskID(taskId);
jobExecuteResult.setUser(MapUtils.getString(taskInfo, "executeUser"));

apiServiceJob = new ApiServiceJob();
apiServiceJob.setSubmitUser(MapUtils.getString(taskInfo, "executeUser"));
apiServiceJob.setProxyUser(MapUtils.getString(taskInfo, "umUser"));
apiServiceJob.setJobExecuteResult(jobExecuteResult);
}
}

return apiServiceJob;
}

}

0 comments on commit 75d6837

Please sign in to comment.