diff --git a/postgres-protocol/src/message/backend.rs b/postgres-protocol/src/message/backend.rs index b6883cc3c..10dec86ca 100644 --- a/postgres-protocol/src/message/backend.rs +++ b/postgres-protocol/src/message/backend.rs @@ -39,6 +39,7 @@ pub const READY_FOR_QUERY_TAG: u8 = b'Z'; // replication message tags pub const XLOG_DATA_TAG: u8 = b'w'; pub const PRIMARY_KEEPALIVE_TAG: u8 = b'k'; +pub const INTERPRETED_WAL_RECORD_TAG: u8 = b'0'; // logical replication message tags const BEGIN_TAG: u8 = b'B'; @@ -325,6 +326,7 @@ impl Message { pub enum ReplicationMessage { XLogData(XLogDataBody), PrimaryKeepAlive(PrimaryKeepAliveBody), + RawInterpretedWalRecords(RawInterpretedWalRecordsBody), } impl ReplicationMessage { @@ -370,6 +372,21 @@ impl ReplicationMessage { reply, }) } + INTERPRETED_WAL_RECORD_TAG => { + let streaming_lsn = buf.read_u64::()?; + let commit_lsn = buf.read_u64::()?; + let next_record_lsn = match buf.read_u64::()? { + 0 => None, + lsn => Some(lsn), + }; + + ReplicationMessage::RawInterpretedWalRecords(RawInterpretedWalRecordsBody { + streaming_lsn, + commit_lsn, + next_record_lsn, + data: buf.read_all(), + }) + } tag => { return Err(io::Error::new( io::ErrorKind::InvalidInput, @@ -950,6 +967,36 @@ impl XLogDataBody { } } +#[derive(Debug)] +pub struct RawInterpretedWalRecordsBody { + streaming_lsn: u64, + commit_lsn: u64, + next_record_lsn: Option, + data: D, +} + +impl RawInterpretedWalRecordsBody { + #[inline] + pub fn streaming_lsn(&self) -> u64 { + self.streaming_lsn + } + + #[inline] + pub fn commit_lsn(&self) -> u64 { + self.commit_lsn + } + + #[inline] + pub fn next_record_lsn(&self) -> Option { + self.next_record_lsn + } + + #[inline] + pub fn data(&self) -> &D { + &self.data + } +} + #[derive(Debug)] pub struct PrimaryKeepAliveBody { wal_end: u64,