Skip to content

Commit

Permalink
[davinci] Fix a NullPointerException when SIT processes a consumer ac…
Browse files Browse the repository at this point in the history
…tion (#1346)

* [davinci] Fix a NullPointerException when SIT processes a consumer action

A NPE can happen when:

1. A store received standby to leader transition, but yet finished, thus VeniceWriterLazyRef remains null.
2. Later, it receives LEADER to STANDBY transition and today's code can hit this NPE.

This fix add a NPE check and also mark VeniceWriterLazyRef to volatile for visibility across threads.
  • Loading branch information
lluwm authored Nov 26, 2024
1 parent aa1ccf7 commit b2d29b8
Show file tree
Hide file tree
Showing 3 changed files with 118 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -548,9 +548,18 @@ protected void processConsumerAction(ConsumerAction message, Store store) throws

/**
* Close the writer to make sure the current segment is closed after the leader is demoted to standby.
*
* An NPE can happen if:
* 1. A partition receives STANDBY to LEADER transition, but not yet fully finished to set veniceWriterLazyRef
* in PCS (e.g. still in IN_TRANSITION_FROM_STANDBY_TO_LEADER).
* 2. Then it receives LEADER to STANDBY transition and veniceWriterLazyRef is still null in PCS.
*/
// If the VeniceWriter doesn't exist, then no need to end any segment, and this function becomes a no-op
partitionConsumptionState.getVeniceWriterLazyRef().ifPresent(vw -> vw.closePartition(partition));
Lazy<VeniceWriter<byte[], byte[], byte[]>> veniceWriterLazyRef =
partitionConsumptionState.getVeniceWriterLazyRef();
if (veniceWriterLazyRef != null) {
veniceWriterLazyRef.ifPresent(vw -> vw.closePartition(partition));
}
break;
default:
processCommonConsumerAction(message);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,7 +212,8 @@ public class PartitionConsumptionState {

private List<String> pendingReportIncPushVersionList;

private Lazy<VeniceWriter<byte[], byte[], byte[]>> veniceWriterLazyRef;
// veniceWriterLazyRef could be set and get in different threads, mark it volatile.
private volatile Lazy<VeniceWriter<byte[], byte[], byte[]>> veniceWriterLazyRef;

public PartitionConsumptionState(String replicaId, int partition, OffsetRecord offsetRecord, boolean hybrid) {
this.replicaId = replicaId;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,18 +3,45 @@
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import static org.testng.Assert.assertFalse;
import static org.testng.Assert.assertTrue;

import com.linkedin.davinci.config.VeniceServerConfig;
import com.linkedin.davinci.config.VeniceStoreVersionConfig;
import com.linkedin.davinci.helix.LeaderFollowerPartitionStateModel;
import com.linkedin.davinci.storage.StorageService;
import com.linkedin.venice.meta.Store;
import com.linkedin.venice.meta.Version;
import com.linkedin.venice.offsets.OffsetRecord;
import com.linkedin.venice.pubsub.PubSubTopicRepository;
import com.linkedin.venice.pubsub.api.PubSubTopicPartition;
import com.linkedin.venice.utils.TestUtils;
import com.linkedin.venice.utils.Utils;
import com.linkedin.venice.utils.lazy.Lazy;
import com.linkedin.venice.writer.VeniceWriter;
import it.unimi.dsi.fastutil.objects.Object2IntMaps;
import java.util.HashMap;
import java.util.Map;
import java.util.Optional;
import java.util.Properties;
import java.util.function.BooleanSupplier;
import org.testng.annotations.Test;


public class LeaderFollowerStoreIngestionTaskTest {
Store mockStore;
private LeaderFollowerStoreIngestionTask leaderFollowerStoreIngestionTask;
private PartitionConsumptionState mockPartitionConsumptionState;
private PubSubTopicPartition mockTopicPartition;
private ConsumerAction mockConsumerAction;
private StorageService mockStorageService;
private Properties mockProperties;
private BooleanSupplier mockBooleanSupplier;
private VeniceStoreVersionConfig mockVeniceStoreVersionConfig;

@Test
public void testCheckWhetherToCloseUnusedVeniceWriter() {
VeniceWriter<byte[], byte[], byte[]> writer1 = mock(VeniceWriter.class);
Expand Down Expand Up @@ -113,4 +140,83 @@ public void testCheckWhetherToCloseUnusedVeniceWriter() {
versionTopicName));
verify(runnable, never()).run();
}

public void setUp() throws InterruptedException {
String storeName = Utils.getUniqueString("store");
int versionNumber = 1;
mockStorageService = mock(StorageService.class);
VeniceServerConfig mockVeniceServerConfig = mock(VeniceServerConfig.class);
doReturn(Object2IntMaps.emptyMap()).when(mockVeniceServerConfig).getKafkaClusterUrlToIdMap();
PubSubTopicRepository pubSubTopicRepository = new PubSubTopicRepository();
StoreIngestionTaskFactory.Builder builder = TestUtils.getStoreIngestionTaskBuilder(storeName)
.setServerConfig(mockVeniceServerConfig)
.setPubSubTopicRepository(pubSubTopicRepository);
mockStore = builder.getMetadataRepo().getStoreOrThrow(storeName);
Version version = mockStore.getVersion(versionNumber);

mockPartitionConsumptionState = mock(PartitionConsumptionState.class);
mockConsumerAction = mock(ConsumerAction.class);

mockProperties = new Properties();
mockBooleanSupplier = mock(BooleanSupplier.class);
mockVeniceStoreVersionConfig = mock(VeniceStoreVersionConfig.class);
String versionTopic = version.kafkaTopicName();
doReturn(versionTopic).when(mockVeniceStoreVersionConfig).getStoreVersionName();

leaderFollowerStoreIngestionTask = new LeaderFollowerStoreIngestionTask(
mockStorageService,
builder,
mockStore,
version,
mockProperties,
mockBooleanSupplier,
mockVeniceStoreVersionConfig,
0,
false,
Optional.empty(),
null);
leaderFollowerStoreIngestionTask.addPartitionConsumptionState(0, mockPartitionConsumptionState);
}

/**
* Test veniceWriterLazyRef in PartitionConsumptionState can handle NPE in processConsumerAction.
*
* 1. No VeniceWriter is set in PCS, processConsumerAction doesn't have NPE thrown.
* 2. VeniceWriter is set, but not initialized, closePartition is not invoked.
* 3. VeniceWriter is set and initialized. closePartition is invoked once.
*/
@Test
public void testVeniceWriterInProcessConsumerAction() throws InterruptedException {
setUp();
when(mockConsumerAction.getType()).thenReturn(ConsumerActionType.LEADER_TO_STANDBY);
when(mockConsumerAction.getTopic()).thenReturn("test-topic");
when(mockConsumerAction.getPartition()).thenReturn(0);
LeaderFollowerPartitionStateModel.LeaderSessionIdChecker mockLeaderSessionIdChecker =
mock(LeaderFollowerPartitionStateModel.LeaderSessionIdChecker.class);
when(mockConsumerAction.getLeaderSessionIdChecker()).thenReturn(mockLeaderSessionIdChecker);
when(mockLeaderSessionIdChecker.isSessionIdValid()).thenReturn(true);
mockTopicPartition = mock(PubSubTopicPartition.class);
OffsetRecord mockOffsetRecord = mock(OffsetRecord.class);
when(mockConsumerAction.getTopicPartition()).thenReturn(mockTopicPartition);
when(mockPartitionConsumptionState.getOffsetRecord()).thenReturn(mockOffsetRecord);

// case 1: No VeniceWriter is set in PCS, processConsumerAction doesn't have NPE.
when(mockPartitionConsumptionState.getVeniceWriterLazyRef()).thenReturn(null);
when(mockPartitionConsumptionState.getLeaderFollowerState()).thenReturn(LeaderFollowerStateType.LEADER);

leaderFollowerStoreIngestionTask.processConsumerAction(mockConsumerAction, mockStore);
verify(mockPartitionConsumptionState, times(1)).setLeaderFollowerState(LeaderFollowerStateType.STANDBY);

// case 2: VeniceWriter is set, but not initialized, closePartition is not invoked.
VeniceWriter mockWriter = mock(VeniceWriter.class);
Lazy<VeniceWriter<byte[], byte[], byte[]>> lazyMockWriter = Lazy.of(() -> mockWriter);
when(mockPartitionConsumptionState.getVeniceWriterLazyRef()).thenReturn(lazyMockWriter);
leaderFollowerStoreIngestionTask.processConsumerAction(mockConsumerAction, mockStore);
verify(mockWriter, times(0)).closePartition(0);

// case 3: VeniceWriter is set and initialized. closePartition is invoked once.
lazyMockWriter.get();
leaderFollowerStoreIngestionTask.processConsumerAction(mockConsumerAction, mockStore);
verify(mockWriter, times(1)).closePartition(0);
}
}

0 comments on commit b2d29b8

Please sign in to comment.