Skip to content

Commit

Permalink
[INLONG-9041][Manager] Support saving schema information when saving …
Browse files Browse the repository at this point in the history
…iceberg source (#9042)
  • Loading branch information
fuweng11 authored Oct 11, 2023
1 parent a3f8102 commit d7459a2
Show file tree
Hide file tree
Showing 7 changed files with 96 additions and 1 deletion.
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ public enum FieldType {
FLOAT64,
DATETIME,
TIMESTAMP,
TIMESTAMPTZ,
LOCAL_ZONE_TIMESTAMP,
ARRAY,
MAP,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.apache.commons.lang3.StringUtils;

import java.math.BigDecimal;
import java.sql.Timestamp;
import java.util.List;
import java.util.Objects;

Expand Down Expand Up @@ -213,16 +214,27 @@ public static Class<?> sqlTypeToJavaType(String type) {
case DECIMAL:
return BigDecimal.class;
case VARCHAR:
case STRING:
return String.class;
case DATE:
case TIME:
case TIMESTAMP:
return java.util.Date.class;
case TIMESTAMP:
case TIMESTAMPTZ:
return Timestamp.class;
default:
return Object.class;
}
}

/**
* Convert SQL type names to Java type string.
*/
public static String sqlTypeToJavaTypeStr(String type) {
Class<?> clazz = FieldInfoUtils.sqlTypeToJavaType(type);
return clazz == Object.class ? "string" : clazz.getSimpleName().toLowerCase();
}

/**
* Get the FieldFormat of Sort according to type string
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,6 +101,9 @@ public class SourceRequest {
@Length(min = 1, max = 163840, message = "length must be between 1 and 163840")
private String snapshot;

@ApiModelProperty(value = "Whether to sync schema from source after saving or updating. Default is false")
private Boolean enableSyncSchema = false;

@ApiModelProperty("Version")
@NotNull(groups = UpdateValidation.class, message = "version cannot be null")
private Integer version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,7 @@ public static List<IcebergColumnInfo> getColumns(String metastoreUri, String dbN
IcebergColumnInfo info = new IcebergColumnInfo();
info.setName(column.name());
info.setRequired(column.isRequired());
info.setType(column.type().toString());
columnList.add(info);
}
return columnList;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -97,6 +97,9 @@ public Integer saveOpt(SourceRequest request, Integer groupStatus, String operat
setTargetEntity(request, entity);
sourceMapper.insert(entity);
saveFieldOpt(entity, request.getFieldList());
if (request.getEnableSyncSchema()) {
syncSourceFieldInfo(request, operator);
}
return entity.getId();
}

Expand Down Expand Up @@ -321,4 +324,10 @@ protected String getSerializationType(StreamSource streamSource, String streamDa

return DataTypeEnum.forType(streamDataType).getType();
}

@Override
@Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
public void syncSourceFieldInfo(SourceRequest request, String operator) {
LOGGER.info("not support sync source field info for type ={}", request.getSourceType());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -116,4 +116,12 @@ default Map<String, List<StreamSource>> getSourcesMap(InlongGroupInfo groupInfo,
*/
void restartOpt(SourceRequest request, String operator);

/**
* Sync the source field info to stream fields.
*
* @param request request of source
* @param operator operator
*/
void syncSourceFieldInfo(SourceRequest request, String operator);

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,23 +17,34 @@

package org.apache.inlong.manager.service.source.iceberg;

import org.apache.inlong.manager.common.consts.InlongConstants;
import org.apache.inlong.manager.common.consts.SourceType;
import org.apache.inlong.manager.common.enums.ErrorCodeEnum;
import org.apache.inlong.manager.common.exceptions.BusinessException;
import org.apache.inlong.manager.common.util.CommonBeanUtils;
import org.apache.inlong.manager.dao.entity.InlongStreamFieldEntity;
import org.apache.inlong.manager.dao.entity.StreamSourceEntity;
import org.apache.inlong.manager.pojo.sink.iceberg.IcebergColumnInfo;
import org.apache.inlong.manager.pojo.sort.util.FieldInfoUtils;
import org.apache.inlong.manager.pojo.source.SourceRequest;
import org.apache.inlong.manager.pojo.source.StreamSource;
import org.apache.inlong.manager.pojo.source.iceberg.IcebergSource;
import org.apache.inlong.manager.pojo.source.iceberg.IcebergSourceDTO;
import org.apache.inlong.manager.pojo.source.iceberg.IcebergSourceRequest;
import org.apache.inlong.manager.pojo.stream.StreamField;
import org.apache.inlong.manager.service.resource.sink.iceberg.IcebergCatalogUtils;
import org.apache.inlong.manager.service.source.AbstractSourceOperator;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.commons.collections.CollectionUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Isolation;
import org.springframework.transaction.annotation.Transactional;

import java.util.ArrayList;
import java.util.List;

/**
Expand All @@ -42,6 +53,8 @@
@Service
public class IcebergSourceOperator extends AbstractSourceOperator {

private static final Logger LOGGER = LoggerFactory.getLogger(IcebergSourceOperator.class);

@Autowired
private ObjectMapper objectMapper;

Expand Down Expand Up @@ -83,4 +96,52 @@ public StreamSource getFromEntity(StreamSourceEntity entity) {
source.setFieldList(sourceFields);
return source;
}

@Override
@Transactional(rollbackFor = Throwable.class, isolation = Isolation.REPEATABLE_READ)
public void syncSourceFieldInfo(SourceRequest request, String operator) {
IcebergSourceRequest sourceRequest = (IcebergSourceRequest) request;

LOGGER.info("sync source field for iceberg {}", sourceRequest);
String metastoreUri = sourceRequest.getUri();
String dbName = sourceRequest.getDatabase();
String tableName = sourceRequest.getTableName();
boolean tableExists = IcebergCatalogUtils.tableExists(metastoreUri, dbName, tableName);
List<StreamField> streamFields = new ArrayList<>();
if (tableExists) {
List<IcebergColumnInfo> existColumns = IcebergCatalogUtils.getColumns(metastoreUri, dbName, tableName);
for (IcebergColumnInfo columnInfo : existColumns) {
StreamField streamField = new StreamField();
streamField.setFieldName(columnInfo.getName());
streamField.setFieldType(FieldInfoUtils.sqlTypeToJavaTypeStr(columnInfo.getType()));
streamField.setFieldComment(columnInfo.getDesc());
streamFields.add(streamField);
}
updateField(sourceRequest.getInlongGroupId(), sourceRequest.getInlongStreamId(), streamFields);
}
}

public void updateField(String groupId, String streamId, List<StreamField> fieldList) {
LOGGER.debug("begin to update inlong stream field, groupId={}, streamId={}, field={}", groupId, streamId,
fieldList);
try {
streamFieldMapper.deleteAllByIdentifier(groupId, streamId);
if (CollectionUtils.isEmpty(fieldList)) {
return;
}
fieldList.forEach(streamField -> streamField.setId(null));
List<InlongStreamFieldEntity> list = CommonBeanUtils.copyListProperties(fieldList,
InlongStreamFieldEntity::new);
for (InlongStreamFieldEntity entity : list) {
entity.setInlongGroupId(groupId);
entity.setInlongStreamId(streamId);
entity.setIsDeleted(InlongConstants.UN_DELETED);
}
streamFieldMapper.insertAll(list);
LOGGER.info("success to update inlong stream field for groupId={}", groupId);
} catch (Exception e) {
LOGGER.error("failed to update inlong stream field: ", e);
throw new BusinessException(ErrorCodeEnum.STREAM_FIELD_SAVE_FAILED);
}
}
}

0 comments on commit d7459a2

Please sign in to comment.