From a070b76b27a1099dce3a7b423d73e7ccfca44dbe Mon Sep 17 00:00:00 2001
From: min-96 <ajdcnd131@naver.com>
Date: Tue, 27 Aug 2024 15:16:25 +0900
Subject: [PATCH 1/4] feat: add LEDGER of StatusType

---
 src/main/kotlin/com/api/nft/controller/NftController.kt     | 6 ++++++
 src/main/kotlin/com/api/nft/enums/Enums.kt                  | 2 +-
 .../kotlin/com/api/nft/service/api/NftAuctionService.kt     | 2 +-
 .../kotlin/com/api/nft/service/api/NftListingService.kt     | 2 +-
 .../db/postgresql/migration/V1__Initial_schema.sql          | 3 ++-
 5 files changed, 11 insertions(+), 4 deletions(-)

diff --git a/src/main/kotlin/com/api/nft/controller/NftController.kt b/src/main/kotlin/com/api/nft/controller/NftController.kt
index d649da6..1e682a8 100644
--- a/src/main/kotlin/com/api/nft/controller/NftController.kt
+++ b/src/main/kotlin/com/api/nft/controller/NftController.kt
@@ -56,6 +56,12 @@ class NftController(
         return nftService.getByWalletNft(wallet,chainType)
 
     }
+    // collection을 구현할건데 1h / 7h /12h / 24h / 7d/ 30d
+    // Listing은 matched volume 이 높은순이여야돼
+
+
+
+    // aution은 현재 acution중인것
 
 
 }
\ No newline at end of file
diff --git a/src/main/kotlin/com/api/nft/enums/Enums.kt b/src/main/kotlin/com/api/nft/enums/Enums.kt
index 7dde924..823dd98 100644
--- a/src/main/kotlin/com/api/nft/enums/Enums.kt
+++ b/src/main/kotlin/com/api/nft/enums/Enums.kt
@@ -25,4 +25,4 @@ enum class TokenType {
         SAND, MATIC, ETH, BTC
 }
 
-enum class StatusType { RESERVATION, ACTIVED, RESERVATION_CANCEL, CANCEL, EXPIRED,LISTING, AUCTION }
\ No newline at end of file
+enum class StatusType { RESERVATION, ACTIVED, RESERVATION_CANCEL, CANCEL, EXPIRED,LISTING, AUCTION,LEDGER }
\ No newline at end of file
diff --git a/src/main/kotlin/com/api/nft/service/api/NftAuctionService.kt b/src/main/kotlin/com/api/nft/service/api/NftAuctionService.kt
index 78dae6d..58102fd 100644
--- a/src/main/kotlin/com/api/nft/service/api/NftAuctionService.kt
+++ b/src/main/kotlin/com/api/nft/service/api/NftAuctionService.kt
@@ -23,7 +23,7 @@ class NftAuctionService(
                         nftAuctionRepository.updateAuction(nftId = newAuction.nftId, statusType = StatusType.AUCTION)
                     }
             }
-            StatusType.RESERVATION_CANCEL, StatusType.CANCEL, StatusType.EXPIRED -> {
+            StatusType.RESERVATION_CANCEL, StatusType.CANCEL, StatusType.EXPIRED, StatusType.LEDGER -> {
                 nftAuctionRepository.findByNftId(newAuction.nftId)
                     .flatMap { nftAuction ->
                         nftAuctionRepository.deleteByNftId(nftAuction.nftId)
diff --git a/src/main/kotlin/com/api/nft/service/api/NftListingService.kt b/src/main/kotlin/com/api/nft/service/api/NftListingService.kt
index d40058c..1741c41 100644
--- a/src/main/kotlin/com/api/nft/service/api/NftListingService.kt
+++ b/src/main/kotlin/com/api/nft/service/api/NftListingService.kt
@@ -27,7 +27,7 @@ class NftListingService(
                     }
                     .then(redisService.updateToRedis(newListing.nftId))
             }
-            StatusType.RESERVATION_CANCEL, StatusType.CANCEL, StatusType.EXPIRED -> {
+            StatusType.RESERVATION_CANCEL, StatusType.CANCEL, StatusType.EXPIRED, StatusType.LEDGER -> {
                 nftListingRepository.findByNftId(newListing.nftId)
                     .flatMap { nftListing ->
                         nftListingRepository.deleteByNftId(nftListing.nftId)
diff --git a/src/main/resources/db/postgresql/migration/V1__Initial_schema.sql b/src/main/resources/db/postgresql/migration/V1__Initial_schema.sql
index bb8cbbc..c685dc5 100644
--- a/src/main/resources/db/postgresql/migration/V1__Initial_schema.sql
+++ b/src/main/resources/db/postgresql/migration/V1__Initial_schema.sql
@@ -24,7 +24,8 @@ CREATE TYPE token_type AS ENUM (
 CREATE TYPE status_type AS ENUM (
     'RESERVATION',
     'LISTING',
-    'AUCTION'
+    'AUCTION',
+    'LEDGER'
     );
 
 

From 1c4baa7afa29d5f5e34e182fd5a58550aebb4c11 Mon Sep 17 00:00:00 2001
From: min-96 <ajdcnd131@naver.com>
Date: Sun, 1 Sep 2024 19:31:33 +0900
Subject: [PATCH 2/4] fix: add collectionLogo to NftMetadataResponse

---
 .../kotlin/com/api/nft/config/RedisConfig.kt  |  5 ++-
 .../nft/controller/dto/NftMetadataResponse.kt | 24 ++++++++++-
 .../repository/NftRepositorySupportImpl.kt    | 43 +++++--------------
 src/test/kotlin/com/api/nft/NftTest.kt        |  8 ++++
 4 files changed, 46 insertions(+), 34 deletions(-)

diff --git a/src/main/kotlin/com/api/nft/config/RedisConfig.kt b/src/main/kotlin/com/api/nft/config/RedisConfig.kt
index 67f1409..93b6d9a 100644
--- a/src/main/kotlin/com/api/nft/config/RedisConfig.kt
+++ b/src/main/kotlin/com/api/nft/config/RedisConfig.kt
@@ -1,5 +1,8 @@
 package com.api.nft.config
 
+import com.fasterxml.jackson.databind.ObjectMapper
+import com.fasterxml.jackson.databind.jsontype.BasicPolymorphicTypeValidator
+import com.fasterxml.jackson.module.kotlin.KotlinModule
 import org.springframework.context.annotation.Bean
 import org.springframework.context.annotation.Configuration
 import org.springframework.data.redis.connection.RedisClusterConfiguration
@@ -43,7 +46,7 @@ class RedisConfig {
                     .autoReconnect(true)
                     .pingBeforeActivateConnection(true)
                     .build()
-            )            
+            )
             .build()
         redisClusterConfiguration.password = RedisPassword.of("bitnami")
         redisClusterConfiguration.maxRedirects = 3
diff --git a/src/main/kotlin/com/api/nft/controller/dto/NftMetadataResponse.kt b/src/main/kotlin/com/api/nft/controller/dto/NftMetadataResponse.kt
index 42be222..36c9a3d 100644
--- a/src/main/kotlin/com/api/nft/controller/dto/NftMetadataResponse.kt
+++ b/src/main/kotlin/com/api/nft/controller/dto/NftMetadataResponse.kt
@@ -2,6 +2,8 @@ package com.api.nft.controller.dto
 
 import com.api.nft.enums.ChainType
 import com.api.nft.enums.ContractType
+import io.r2dbc.spi.Row
+import java.math.BigDecimal
 
 
 data class NftMetadataResponse(
@@ -14,4 +16,24 @@ data class NftMetadataResponse(
     val collectionName: String,
     val image: String,
     val lastPrice: Double?,
-)
+    val collectionLogo: String?
+){
+    companion object {
+        fun fromRow(row: Row): NftMetadataResponse {
+            val idValue = row.get("id")
+            println("ID value type: ${idValue?.javaClass}, value: $idValue")
+            return NftMetadataResponse(
+                id = row.get("id", Integer::class.java)!!.toLong(),
+                tokenId = row.get("tokenid", String::class.java)!!,
+                tokenAddress = row.get("tokenaddress", String::class.java)!!,
+                chainType = ChainType.valueOf(row.get("chaintype", String::class.java)!!),
+                nftName = row.get("nftname", String::class.java)!!,
+                collectionName = row.get("collectionname", String::class.java)!!,
+                image = row.get("image", String::class.java) ?: "",
+                contractType = ContractType.valueOf(row.get("contracttype", String::class.java)!!),
+                lastPrice = row.get("lastprice", BigDecimal::class.java)?.toDouble(),
+                collectionLogo = row.get("collectionlogo", String::class.java) ?: ""
+            )
+        }
+    }
+}
diff --git a/src/main/kotlin/com/api/nft/domain/nft/repository/NftRepositorySupportImpl.kt b/src/main/kotlin/com/api/nft/domain/nft/repository/NftRepositorySupportImpl.kt
index da84206..c76c1b4 100644
--- a/src/main/kotlin/com/api/nft/domain/nft/repository/NftRepositorySupportImpl.kt
+++ b/src/main/kotlin/com/api/nft/domain/nft/repository/NftRepositorySupportImpl.kt
@@ -1,12 +1,9 @@
 package com.api.nft.domain.nft.repository
 
 import com.api.nft.controller.dto.NftMetadataResponse
-import com.api.nft.enums.ChainType
-import com.api.nft.enums.ContractType
 import org.springframework.data.r2dbc.core.R2dbcEntityTemplate
 import reactor.core.publisher.Flux
 import reactor.core.publisher.Mono
-import java.math.BigDecimal
 
 class NftRepositorySupportImpl(
     private val r2dbcEntityTemplate: R2dbcEntityTemplate
@@ -22,9 +19,12 @@ class NftRepositorySupportImpl(
             n.collection_name AS collectionName,
             n.contract_type AS contractType,
             m.image AS image,
-            nl.price AS lastPrice
+            nl.price AS lastPrice,
+            c.logo AS collectionLogo
         FROM 
             nft n
+        LEFT JOIN 
+            collection c ON n.collection_name = c.name    
         LEFT JOIN 
             metadata m ON n.id = m.nft_id
         LEFT JOIN 
@@ -32,22 +32,10 @@ class NftRepositorySupportImpl(
         WHERE 
             n.id = :$1
     """
-
         return r2dbcEntityTemplate.databaseClient.sql(query)
             .bind(0, id)
-            .map { row, data ->
-                NftMetadataResponse(
-                    id = (row.get("id") as Number).toLong(),
-                    tokenId = row.get("tokenId", String::class.java)!!,
-                    tokenAddress = row.get("tokenAddress", String::class.java)!!,
-                    chainType = row.get("chainType", ChainType::class.java)!!,
-                    nftName = row.get("nftName", String::class.java)!!,
-                    collectionName = row.get("collectionName", String::class.java)!!,
-                    image = row.get("image", String::class.java) ?: "",
-                    contractType = row.get("contractType", ContractType::class.java)!!,
-                    lastPrice = row.get("lastPrice", BigDecimal::class.java)?.toDouble(),
-                )
-            }.first()
+            .map { row, _ -> NftMetadataResponse.fromRow(row) }
+            .first()
     }
 
     override fun findAllByNftJoinMetadata(ids: List<Long>): Flux<NftMetadataResponse> {
@@ -63,27 +51,18 @@ class NftRepositorySupportImpl(
             m.image AS image
         FROM 
             nft n
+        LEFT JOIN 
+            collection c ON n.collection_name = c.name    
         LEFT JOIN 
             metadata m ON n.id = m.nft_id
-        LEFT JOIN nft_listing nl ON nft.id = nl.nft_id    
+        LEFT JOIN 
+            nft_listing nl ON n.id = nl.nft_id    
         WHERE 
             n.id IN (:$1)
     """
         return r2dbcEntityTemplate.databaseClient.sql(query)
             .bind(0, ids)
-            .map { row, metadata ->
-                NftMetadataResponse(
-                    id = (row.get("id") as Number).toLong(),
-                    tokenId = row.get("tokenId", String::class.java)!!,
-                    tokenAddress = row.get("tokenAddress", String::class.java)!!,
-                    chainType = row.get("chainType", ChainType::class.java)!!,
-                    nftName = row.get("nftName", String::class.java)!!,
-                    collectionName = row.get("collectionName", String::class.java)!!,
-                    image = row.get("image", String::class.java) ?: "",
-                    contractType = row.get("contractType", ContractType::class.java)!!,
-                    lastPrice = row.get("lastPrice", BigDecimal::class.java)?.toDouble(),
-                )
-            }
+            .map { row, _ -> NftMetadataResponse.fromRow(row) }
             .all()
     }
 
diff --git a/src/test/kotlin/com/api/nft/NftTest.kt b/src/test/kotlin/com/api/nft/NftTest.kt
index f080130..28bc5c2 100644
--- a/src/test/kotlin/com/api/nft/NftTest.kt
+++ b/src/test/kotlin/com/api/nft/NftTest.kt
@@ -22,10 +22,12 @@ import org.junit.jupiter.api.Test
 import org.springframework.beans.factory.annotation.Autowired
 import org.springframework.boot.test.context.SpringBootTest
 import org.springframework.context.ApplicationEventPublisher
+import org.springframework.test.context.ActiveProfiles
 import reactor.test.StepVerifier
 import java.math.BigDecimal
 
 @SpringBootTest
+@ActiveProfiles("local")
 class NftTest(
     @Autowired private val moralisApiService: MoralisApiService,
     @Autowired private val nftService: NftService,
@@ -169,4 +171,10 @@ class NftTest(
         println(res.toString())
     }
 
+   @Test
+   fun test1() {
+       val nft = nftRepository.findByNftJoinMetadata(10L).block()
+       println(nft.toString())
+   }
+
 }
\ No newline at end of file

From 4b423849802eca102f785b3121162372369659d9 Mon Sep 17 00:00:00 2001
From: min-96 <ajdcnd131@naver.com>
Date: Tue, 3 Sep 2024 21:06:18 +0900
Subject: [PATCH 3/4] apply kafka

---
 build.gradle.kts                              |  1 +
 .../kotlin/com/api/nft/config/KafkaConfig.kt  | 65 +++++++++++++++++++
 src/main/kotlin/com/api/nft/enums/Enums.kt    |  3 +-
 .../kotlin/com/api/nft/kafka/KafkaConsumer.kt | 28 ++++++++
 .../com/api/nft/kafka/dto/SaleResponse.kt     | 20 ++++++
 5 files changed, 116 insertions(+), 1 deletion(-)
 create mode 100644 src/main/kotlin/com/api/nft/config/KafkaConfig.kt
 create mode 100644 src/main/kotlin/com/api/nft/kafka/KafkaConsumer.kt
 create mode 100644 src/main/kotlin/com/api/nft/kafka/dto/SaleResponse.kt

diff --git a/build.gradle.kts b/build.gradle.kts
index 78deaab..cae33f4 100644
--- a/build.gradle.kts
+++ b/build.gradle.kts
@@ -27,6 +27,7 @@ dependencies {
     implementation("org.springframework.boot:spring-boot-starter-batch")
     implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
     implementation("org.postgresql:r2dbc-postgresql:1.0.4.RELEASE")
+    implementation("org.springframework.kafka:spring-kafka")
     implementation("org.postgresql:postgresql:42.7.3")
 
     implementation("org.springframework.boot:spring-boot-starter-web")
diff --git a/src/main/kotlin/com/api/nft/config/KafkaConfig.kt b/src/main/kotlin/com/api/nft/config/KafkaConfig.kt
new file mode 100644
index 0000000..87c0722
--- /dev/null
+++ b/src/main/kotlin/com/api/nft/config/KafkaConfig.kt
@@ -0,0 +1,65 @@
+package com.api.nft.config
+
+import org.apache.kafka.clients.admin.AdminClientConfig
+import org.apache.kafka.clients.consumer.ConsumerConfig
+import org.apache.kafka.common.serialization.StringDeserializer
+import org.slf4j.LoggerFactory
+import org.springframework.beans.factory.annotation.Value
+import org.springframework.context.annotation.Bean
+import org.springframework.context.annotation.Configuration
+import org.springframework.kafka.annotation.EnableKafka
+import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
+import org.springframework.kafka.core.ConsumerFactory
+import org.springframework.kafka.core.DefaultKafkaConsumerFactory
+import org.springframework.kafka.core.KafkaAdmin
+import org.springframework.kafka.listener.CommonErrorHandler
+import org.springframework.kafka.listener.ContainerProperties
+import org.springframework.kafka.listener.MessageListenerContainer
+import org.springframework.kafka.support.serializer.JsonDeserializer
+
+@Configuration
+@EnableKafka
+class KafkaConfig {
+    private val logger = LoggerFactory.getLogger(KafkaConfig::class.java)
+
+    @Value("\${spring.kafka.bootstrap-servers}")
+    private lateinit var bootstrapServers: String
+
+    @Bean
+    fun kafkaAdmin(): KafkaAdmin = KafkaAdmin(mapOf(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers))
+
+    @Bean
+    fun consumerFactory(): ConsumerFactory<String, Any> {
+        val props =
+            mapOf(
+                ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
+                ConsumerConfig.GROUP_ID_CONFIG to "wallet-group",
+                ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java.name,
+                ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java.name,
+                JsonDeserializer.TRUSTED_PACKAGES to "*",
+                JsonDeserializer.VALUE_DEFAULT_TYPE to Any::class.java.name,
+            )
+        return DefaultKafkaConsumerFactory(props, StringDeserializer(), JsonDeserializer(Any::class.java, false))
+    }
+
+    @Bean
+    fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, Any> {
+        val factory = ConcurrentKafkaListenerContainerFactory<String, Any>()
+        factory.consumerFactory = consumerFactory()
+        factory.setConcurrency(4)
+        factory.setCommonErrorHandler(
+            object : CommonErrorHandler {
+                override fun handleRemaining(
+                    thrownException: Exception,
+                    records: List<org.apache.kafka.clients.consumer.ConsumerRecord<*, *>>,
+                    consumer: org.apache.kafka.clients.consumer.Consumer<*, *>,
+                    container: MessageListenerContainer,
+                ) {
+                    logger.error("Error in consumer: ${thrownException.message}", thrownException)
+                    logger.error("Problematic records: $records")
+                }
+            },
+        )
+        return factory
+    }
+}
\ No newline at end of file
diff --git a/src/main/kotlin/com/api/nft/enums/Enums.kt b/src/main/kotlin/com/api/nft/enums/Enums.kt
index 823dd98..39df512 100644
--- a/src/main/kotlin/com/api/nft/enums/Enums.kt
+++ b/src/main/kotlin/com/api/nft/enums/Enums.kt
@@ -11,6 +11,7 @@ enum class ChainType{
 
 }
 
+enum class OrderType { LISTING, AUCTION }
 
 enum class NetworkType{
         ETHEREUM,
@@ -25,4 +26,4 @@ enum class TokenType {
         SAND, MATIC, ETH, BTC
 }
 
-enum class StatusType { RESERVATION, ACTIVED, RESERVATION_CANCEL, CANCEL, EXPIRED,LISTING, AUCTION,LEDGER }
\ No newline at end of file
+enum class StatusType { RESERVATION, ACTIVED, RESERVATION_CANCEL, CANCEL, EXPIRED,LISTING, AUCTION,LEDGER }
diff --git a/src/main/kotlin/com/api/nft/kafka/KafkaConsumer.kt b/src/main/kotlin/com/api/nft/kafka/KafkaConsumer.kt
new file mode 100644
index 0000000..c42a3e5
--- /dev/null
+++ b/src/main/kotlin/com/api/nft/kafka/KafkaConsumer.kt
@@ -0,0 +1,28 @@
+package com.api.nft.kafka
+
+import com.api.nft.kafka.dto.SaleResponse
+import com.fasterxml.jackson.databind.ObjectMapper
+import org.springframework.kafka.annotation.KafkaListener
+import org.springframework.messaging.Message
+import org.springframework.stereotype.Service
+
+@Service
+class KafkaConsumer(
+    private val objectMapper: ObjectMapper,
+) {
+    @KafkaListener(topics = ["sale-topic"],
+        groupId = "nft-group",
+        containerFactory = "kafkaListenerContainerFactory"
+    )
+    fun consumeLedgerStatusEvents(message: Message<Any>) {
+        val payload = message.payload
+
+        if (payload is LinkedHashMap<*, *>) {
+            val saleStatusRequest = objectMapper.convertValue(payload, SaleResponse::class.java)
+
+            println("saleStatusRequest : " + saleStatusRequest)
+            // orderService.updateOrderStatus(ledgerStatusRequest).subscribe()
+        }
+    }
+
+}
\ No newline at end of file
diff --git a/src/main/kotlin/com/api/nft/kafka/dto/SaleResponse.kt b/src/main/kotlin/com/api/nft/kafka/dto/SaleResponse.kt
new file mode 100644
index 0000000..3b6f227
--- /dev/null
+++ b/src/main/kotlin/com/api/nft/kafka/dto/SaleResponse.kt
@@ -0,0 +1,20 @@
+package com.api.nft.kafka.dto
+
+import com.api.nft.enums.ChainType
+import com.api.nft.enums.StatusType
+import com.api.nft.enums.OrderType
+import java.math.BigDecimal
+
+data class SaleResponse(
+    val id : Long,
+    val nftId : Long,
+    val address: String,
+    val createdDateTime: Long,
+    val endDateTime: Long,
+    val statusType: StatusType,
+    val startingPrice: BigDecimal,
+    val chainType: ChainType,
+    val orderType: OrderType
+)
+
+

From 27ffe01971e707468aae78f37641416cff91cfca Mon Sep 17 00:00:00 2001
From: min-96 <ajdcnd131@naver.com>
Date: Thu, 5 Sep 2024 00:30:05 +0900
Subject: [PATCH 4/4] feat: sale topic

---
 .../kotlin/com/api/nft/config/KafkaConfig.kt  |  5 +--
 .../kotlin/com/api/nft/config/RabbitConfig.kt |  8 -----
 .../kotlin/com/api/nft/kafka/KafkaConsumer.kt | 11 +++++-
 .../com/api/nft/kafka/dto/SaleResponse.kt     |  2 +-
 .../com/api/nft/rabbitMQ/RabbitMQReceiver.kt  | 36 -------------------
 .../api/nft/service/api/NftAuctionService.kt  |  7 ++--
 .../api/nft/service/api/NftListingService.kt  |  5 +--
 .../external/moralis/MoralisApiService.kt     |  2 +-
 8 files changed, 22 insertions(+), 54 deletions(-)
 delete mode 100644 src/main/kotlin/com/api/nft/rabbitMQ/RabbitMQReceiver.kt

diff --git a/src/main/kotlin/com/api/nft/config/KafkaConfig.kt b/src/main/kotlin/com/api/nft/config/KafkaConfig.kt
index 87c0722..5eac751 100644
--- a/src/main/kotlin/com/api/nft/config/KafkaConfig.kt
+++ b/src/main/kotlin/com/api/nft/config/KafkaConfig.kt
@@ -1,6 +1,7 @@
 package com.api.nft.config
 
 import org.apache.kafka.clients.admin.AdminClientConfig
+import org.apache.kafka.clients.admin.NewTopic
 import org.apache.kafka.clients.consumer.ConsumerConfig
 import org.apache.kafka.common.serialization.StringDeserializer
 import org.slf4j.LoggerFactory
@@ -9,11 +10,11 @@ import org.springframework.context.annotation.Bean
 import org.springframework.context.annotation.Configuration
 import org.springframework.kafka.annotation.EnableKafka
 import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
+import org.springframework.kafka.config.TopicBuilder
 import org.springframework.kafka.core.ConsumerFactory
 import org.springframework.kafka.core.DefaultKafkaConsumerFactory
 import org.springframework.kafka.core.KafkaAdmin
 import org.springframework.kafka.listener.CommonErrorHandler
-import org.springframework.kafka.listener.ContainerProperties
 import org.springframework.kafka.listener.MessageListenerContainer
 import org.springframework.kafka.support.serializer.JsonDeserializer
 
@@ -33,7 +34,7 @@ class KafkaConfig {
         val props =
             mapOf(
                 ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
-                ConsumerConfig.GROUP_ID_CONFIG to "wallet-group",
+                ConsumerConfig.GROUP_ID_CONFIG to "nft-group",
                 ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java.name,
                 ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java.name,
                 JsonDeserializer.TRUSTED_PACKAGES to "*",
diff --git a/src/main/kotlin/com/api/nft/config/RabbitConfig.kt b/src/main/kotlin/com/api/nft/config/RabbitConfig.kt
index ef5b415..b189c52 100644
--- a/src/main/kotlin/com/api/nft/config/RabbitConfig.kt
+++ b/src/main/kotlin/com/api/nft/config/RabbitConfig.kt
@@ -29,12 +29,4 @@ class RabbitConfig {
 
     @Bean
     fun nftExchange() = createFanoutExchange("nftExchange")
-
-
-    @Bean
-    fun listingExchange() = createFanoutExchange("listingExchange")
-
-
-     @Bean
-     fun auctionExchange() = createFanoutExchange("auctionExchange")
 }
\ No newline at end of file
diff --git a/src/main/kotlin/com/api/nft/kafka/KafkaConsumer.kt b/src/main/kotlin/com/api/nft/kafka/KafkaConsumer.kt
index c42a3e5..5f540fe 100644
--- a/src/main/kotlin/com/api/nft/kafka/KafkaConsumer.kt
+++ b/src/main/kotlin/com/api/nft/kafka/KafkaConsumer.kt
@@ -1,6 +1,9 @@
 package com.api.nft.kafka
 
+import com.api.nft.enums.OrderType
 import com.api.nft.kafka.dto.SaleResponse
+import com.api.nft.service.api.NftAuctionService
+import com.api.nft.service.api.NftListingService
 import com.fasterxml.jackson.databind.ObjectMapper
 import org.springframework.kafka.annotation.KafkaListener
 import org.springframework.messaging.Message
@@ -9,6 +12,8 @@ import org.springframework.stereotype.Service
 @Service
 class KafkaConsumer(
     private val objectMapper: ObjectMapper,
+    private val nftListingService: NftListingService,
+    private val nftAuctionService: NftAuctionService
 ) {
     @KafkaListener(topics = ["sale-topic"],
         groupId = "nft-group",
@@ -19,8 +24,12 @@ class KafkaConsumer(
 
         if (payload is LinkedHashMap<*, *>) {
             val saleStatusRequest = objectMapper.convertValue(payload, SaleResponse::class.java)
-
             println("saleStatusRequest : " + saleStatusRequest)
+            when(saleStatusRequest.orderType){
+                OrderType.LISTING -> nftListingService.update(saleStatusRequest).subscribe()
+                OrderType.AUCTION -> nftAuctionService.update(saleStatusRequest).subscribe()
+            }
+
             // orderService.updateOrderStatus(ledgerStatusRequest).subscribe()
         }
     }
diff --git a/src/main/kotlin/com/api/nft/kafka/dto/SaleResponse.kt b/src/main/kotlin/com/api/nft/kafka/dto/SaleResponse.kt
index 3b6f227..409d246 100644
--- a/src/main/kotlin/com/api/nft/kafka/dto/SaleResponse.kt
+++ b/src/main/kotlin/com/api/nft/kafka/dto/SaleResponse.kt
@@ -12,7 +12,7 @@ data class SaleResponse(
     val createdDateTime: Long,
     val endDateTime: Long,
     val statusType: StatusType,
-    val startingPrice: BigDecimal,
+    val price: BigDecimal,
     val chainType: ChainType,
     val orderType: OrderType
 )
diff --git a/src/main/kotlin/com/api/nft/rabbitMQ/RabbitMQReceiver.kt b/src/main/kotlin/com/api/nft/rabbitMQ/RabbitMQReceiver.kt
deleted file mode 100644
index 3dc84bf..0000000
--- a/src/main/kotlin/com/api/nft/rabbitMQ/RabbitMQReceiver.kt
+++ /dev/null
@@ -1,36 +0,0 @@
-package com.api.nft.rabbitMQ
-
-import com.api.nft.service.api.NftAuctionService
-import com.api.nft.service.api.NftListingService
-import com.api.nft.service.dto.ListingResponse
-import org.springframework.amqp.core.ExchangeTypes
-import org.springframework.amqp.rabbit.annotation.Exchange
-import org.springframework.amqp.rabbit.annotation.Queue
-import org.springframework.amqp.rabbit.annotation.QueueBinding
-import org.springframework.amqp.rabbit.annotation.RabbitListener
-import org.springframework.stereotype.Service
-import com.api.nft.service.dto.AuctionResponse as AuctionResponse
-
-@Service
-class RabbitMQReceiver(
-    private val nftListingService: NftListingService,
-    private val nftAuctionService: NftAuctionService
-) {
-
-
-    @RabbitListener(bindings = [QueueBinding(
-    value = Queue(name = "", durable = "false", exclusive = "true", autoDelete = "true"),
-    exchange = Exchange(value = "listingExchange", type = ExchangeTypes.FANOUT)
-    )])
-    fun listingMessage(listing: ListingResponse){
-        nftListingService.update(listing).subscribe()
-    }
-
-     @RabbitListener(bindings = [QueueBinding(
-         value = Queue(name = "", durable = "false", exclusive = "true", autoDelete = "true"),
-         exchange = Exchange(value = "auctionExchange", type = ExchangeTypes.FANOUT)
-     )])
-     fun auctionMessage(auction: AuctionResponse){
-         nftAuctionService.update(auction).subscribe()
-     }
-}
\ No newline at end of file
diff --git a/src/main/kotlin/com/api/nft/service/api/NftAuctionService.kt b/src/main/kotlin/com/api/nft/service/api/NftAuctionService.kt
index 58102fd..8fee8cb 100644
--- a/src/main/kotlin/com/api/nft/service/api/NftAuctionService.kt
+++ b/src/main/kotlin/com/api/nft/service/api/NftAuctionService.kt
@@ -3,6 +3,7 @@ package com.api.nft.service.api
 import com.api.nft.domain.nft.NftAuction
 import com.api.nft.domain.nft.repository.NftAuctionRepository
 import com.api.nft.enums.StatusType
+import com.api.nft.kafka.dto.SaleResponse
 import com.api.nft.service.dto.AuctionResponse
 import org.springframework.stereotype.Service
 import reactor.core.publisher.Mono
@@ -12,7 +13,7 @@ class NftAuctionService(
     private val nftAuctionRepository: NftAuctionRepository,
 ) {
 
-    fun update(newAuction: AuctionResponse): Mono<Void> {
+    fun update(newAuction: SaleResponse): Mono<Void> {
         return when (newAuction.statusType) {
             StatusType.RESERVATION -> {
                 save(newAuction)
@@ -33,12 +34,12 @@ class NftAuctionService(
             else -> Mono.empty()
         }
     }
-    fun save(auction: AuctionResponse) : Mono<Void> {
+    fun save(auction: SaleResponse) : Mono<Void> {
         return nftAuctionRepository.save(
             NftAuction(
                 id = auction.id,
                 nftId =  auction.nftId,
-                startingPrice = auction.startingPrice,
+                startingPrice = auction.price,
                 chainType = auction.chainType,
                 statusType = auction.statusType,
                 createdDate = auction.createdDateTime,
diff --git a/src/main/kotlin/com/api/nft/service/api/NftListingService.kt b/src/main/kotlin/com/api/nft/service/api/NftListingService.kt
index 1741c41..0850702 100644
--- a/src/main/kotlin/com/api/nft/service/api/NftListingService.kt
+++ b/src/main/kotlin/com/api/nft/service/api/NftListingService.kt
@@ -3,6 +3,7 @@ package com.api.nft.service.api
 import com.api.nft.domain.nft.NftListing
 import com.api.nft.domain.nft.repository.NftListingRepository
 import com.api.nft.enums.StatusType
+import com.api.nft.kafka.dto.SaleResponse
 import com.api.nft.service.RedisService
 import com.api.nft.service.dto.ListingResponse
 import org.springframework.stereotype.Service
@@ -14,7 +15,7 @@ class NftListingService(
     private val redisService: RedisService,
 ) {
 
-    fun update(newListing: ListingResponse): Mono<Void> {
+    fun update(newListing: SaleResponse): Mono<Void> {
         return when (newListing.statusType) {
             StatusType.RESERVATION -> {
                 save(newListing)
@@ -37,7 +38,7 @@ class NftListingService(
             else -> Mono.empty()
         }
     }
-    fun save(listing: ListingResponse) : Mono<NftListing> {
+    fun save(listing: SaleResponse) : Mono<NftListing> {
         return nftListingRepository.save(
             NftListing(
                 id = listing.id,
diff --git a/src/main/kotlin/com/api/nft/service/external/moralis/MoralisApiService.kt b/src/main/kotlin/com/api/nft/service/external/moralis/MoralisApiService.kt
index 605e570..41ef570 100644
--- a/src/main/kotlin/com/api/nft/service/external/moralis/MoralisApiService.kt
+++ b/src/main/kotlin/com/api/nft/service/external/moralis/MoralisApiService.kt
@@ -27,7 +27,7 @@ class MoralisApiService(
             ChainType.LINEA_SEPOLIA -> "0xe705"
             ChainType.ETHEREUM_HOLESKY -> "0x4268"
             ChainType.ETHEREUM_SEPOLIA -> "0xaa36a7"
-            ChainType.POLYGON_AMOY -> "0xaa36a7"
+            ChainType.POLYGON_AMOY -> "0x13882"
         }
         return chain
     }