diff --git a/build.gradle.kts b/build.gradle.kts index 78deaab..cae33f4 100644 --- a/build.gradle.kts +++ b/build.gradle.kts @@ -27,6 +27,7 @@ dependencies { implementation("org.springframework.boot:spring-boot-starter-batch") implementation("org.springframework.boot:spring-boot-starter-data-r2dbc") implementation("org.postgresql:r2dbc-postgresql:1.0.4.RELEASE") + implementation("org.springframework.kafka:spring-kafka") implementation("org.postgresql:postgresql:42.7.3") implementation("org.springframework.boot:spring-boot-starter-web") diff --git a/src/main/kotlin/com/api/nft/config/KafkaConfig.kt b/src/main/kotlin/com/api/nft/config/KafkaConfig.kt new file mode 100644 index 0000000..5eac751 --- /dev/null +++ b/src/main/kotlin/com/api/nft/config/KafkaConfig.kt @@ -0,0 +1,66 @@ +package com.api.nft.config + +import org.apache.kafka.clients.admin.AdminClientConfig +import org.apache.kafka.clients.admin.NewTopic +import org.apache.kafka.clients.consumer.ConsumerConfig +import org.apache.kafka.common.serialization.StringDeserializer +import org.slf4j.LoggerFactory +import org.springframework.beans.factory.annotation.Value +import org.springframework.context.annotation.Bean +import org.springframework.context.annotation.Configuration +import org.springframework.kafka.annotation.EnableKafka +import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory +import org.springframework.kafka.config.TopicBuilder +import org.springframework.kafka.core.ConsumerFactory +import org.springframework.kafka.core.DefaultKafkaConsumerFactory +import org.springframework.kafka.core.KafkaAdmin +import org.springframework.kafka.listener.CommonErrorHandler +import org.springframework.kafka.listener.MessageListenerContainer +import org.springframework.kafka.support.serializer.JsonDeserializer + +@Configuration +@EnableKafka +class KafkaConfig { + private val logger = LoggerFactory.getLogger(KafkaConfig::class.java) + + @Value("\${spring.kafka.bootstrap-servers}") + private lateinit var bootstrapServers: String + + @Bean + fun kafkaAdmin(): KafkaAdmin = KafkaAdmin(mapOf(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers)) + + @Bean + fun consumerFactory(): ConsumerFactory { + val props = + mapOf( + ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers, + ConsumerConfig.GROUP_ID_CONFIG to "nft-group", + ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java.name, + ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java.name, + JsonDeserializer.TRUSTED_PACKAGES to "*", + JsonDeserializer.VALUE_DEFAULT_TYPE to Any::class.java.name, + ) + return DefaultKafkaConsumerFactory(props, StringDeserializer(), JsonDeserializer(Any::class.java, false)) + } + + @Bean + fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory { + val factory = ConcurrentKafkaListenerContainerFactory() + factory.consumerFactory = consumerFactory() + factory.setConcurrency(4) + factory.setCommonErrorHandler( + object : CommonErrorHandler { + override fun handleRemaining( + thrownException: Exception, + records: List>, + consumer: org.apache.kafka.clients.consumer.Consumer<*, *>, + container: MessageListenerContainer, + ) { + logger.error("Error in consumer: ${thrownException.message}", thrownException) + logger.error("Problematic records: $records") + } + }, + ) + return factory + } +} \ No newline at end of file diff --git a/src/main/kotlin/com/api/nft/config/RabbitConfig.kt b/src/main/kotlin/com/api/nft/config/RabbitConfig.kt index ef5b415..b189c52 100644 --- a/src/main/kotlin/com/api/nft/config/RabbitConfig.kt +++ b/src/main/kotlin/com/api/nft/config/RabbitConfig.kt @@ -29,12 +29,4 @@ class RabbitConfig { @Bean fun nftExchange() = createFanoutExchange("nftExchange") - - - @Bean - fun listingExchange() = createFanoutExchange("listingExchange") - - - @Bean - fun auctionExchange() = createFanoutExchange("auctionExchange") } \ No newline at end of file diff --git a/src/main/kotlin/com/api/nft/config/RedisConfig.kt b/src/main/kotlin/com/api/nft/config/RedisConfig.kt index 67f1409..93b6d9a 100644 --- a/src/main/kotlin/com/api/nft/config/RedisConfig.kt +++ b/src/main/kotlin/com/api/nft/config/RedisConfig.kt @@ -1,5 +1,8 @@ package com.api.nft.config +import com.fasterxml.jackson.databind.ObjectMapper +import com.fasterxml.jackson.databind.jsontype.BasicPolymorphicTypeValidator +import com.fasterxml.jackson.module.kotlin.KotlinModule import org.springframework.context.annotation.Bean import org.springframework.context.annotation.Configuration import org.springframework.data.redis.connection.RedisClusterConfiguration @@ -43,7 +46,7 @@ class RedisConfig { .autoReconnect(true) .pingBeforeActivateConnection(true) .build() - ) + ) .build() redisClusterConfiguration.password = RedisPassword.of("bitnami") redisClusterConfiguration.maxRedirects = 3 diff --git a/src/main/kotlin/com/api/nft/controller/NftController.kt b/src/main/kotlin/com/api/nft/controller/NftController.kt index d649da6..1e682a8 100644 --- a/src/main/kotlin/com/api/nft/controller/NftController.kt +++ b/src/main/kotlin/com/api/nft/controller/NftController.kt @@ -56,6 +56,12 @@ class NftController( return nftService.getByWalletNft(wallet,chainType) } + // collection을 구현할건데 1h / 7h /12h / 24h / 7d/ 30d + // Listing은 matched volume 이 높은순이여야돼 + + + + // aution은 현재 acution중인것 } \ No newline at end of file diff --git a/src/main/kotlin/com/api/nft/controller/dto/NftMetadataResponse.kt b/src/main/kotlin/com/api/nft/controller/dto/NftMetadataResponse.kt index 42be222..36c9a3d 100644 --- a/src/main/kotlin/com/api/nft/controller/dto/NftMetadataResponse.kt +++ b/src/main/kotlin/com/api/nft/controller/dto/NftMetadataResponse.kt @@ -2,6 +2,8 @@ package com.api.nft.controller.dto import com.api.nft.enums.ChainType import com.api.nft.enums.ContractType +import io.r2dbc.spi.Row +import java.math.BigDecimal data class NftMetadataResponse( @@ -14,4 +16,24 @@ data class NftMetadataResponse( val collectionName: String, val image: String, val lastPrice: Double?, -) + val collectionLogo: String? +){ + companion object { + fun fromRow(row: Row): NftMetadataResponse { + val idValue = row.get("id") + println("ID value type: ${idValue?.javaClass}, value: $idValue") + return NftMetadataResponse( + id = row.get("id", Integer::class.java)!!.toLong(), + tokenId = row.get("tokenid", String::class.java)!!, + tokenAddress = row.get("tokenaddress", String::class.java)!!, + chainType = ChainType.valueOf(row.get("chaintype", String::class.java)!!), + nftName = row.get("nftname", String::class.java)!!, + collectionName = row.get("collectionname", String::class.java)!!, + image = row.get("image", String::class.java) ?: "", + contractType = ContractType.valueOf(row.get("contracttype", String::class.java)!!), + lastPrice = row.get("lastprice", BigDecimal::class.java)?.toDouble(), + collectionLogo = row.get("collectionlogo", String::class.java) ?: "" + ) + } + } +} diff --git a/src/main/kotlin/com/api/nft/domain/nft/repository/NftRepositorySupportImpl.kt b/src/main/kotlin/com/api/nft/domain/nft/repository/NftRepositorySupportImpl.kt index da84206..c76c1b4 100644 --- a/src/main/kotlin/com/api/nft/domain/nft/repository/NftRepositorySupportImpl.kt +++ b/src/main/kotlin/com/api/nft/domain/nft/repository/NftRepositorySupportImpl.kt @@ -1,12 +1,9 @@ package com.api.nft.domain.nft.repository import com.api.nft.controller.dto.NftMetadataResponse -import com.api.nft.enums.ChainType -import com.api.nft.enums.ContractType import org.springframework.data.r2dbc.core.R2dbcEntityTemplate import reactor.core.publisher.Flux import reactor.core.publisher.Mono -import java.math.BigDecimal class NftRepositorySupportImpl( private val r2dbcEntityTemplate: R2dbcEntityTemplate @@ -22,9 +19,12 @@ class NftRepositorySupportImpl( n.collection_name AS collectionName, n.contract_type AS contractType, m.image AS image, - nl.price AS lastPrice + nl.price AS lastPrice, + c.logo AS collectionLogo FROM nft n + LEFT JOIN + collection c ON n.collection_name = c.name LEFT JOIN metadata m ON n.id = m.nft_id LEFT JOIN @@ -32,22 +32,10 @@ class NftRepositorySupportImpl( WHERE n.id = :$1 """ - return r2dbcEntityTemplate.databaseClient.sql(query) .bind(0, id) - .map { row, data -> - NftMetadataResponse( - id = (row.get("id") as Number).toLong(), - tokenId = row.get("tokenId", String::class.java)!!, - tokenAddress = row.get("tokenAddress", String::class.java)!!, - chainType = row.get("chainType", ChainType::class.java)!!, - nftName = row.get("nftName", String::class.java)!!, - collectionName = row.get("collectionName", String::class.java)!!, - image = row.get("image", String::class.java) ?: "", - contractType = row.get("contractType", ContractType::class.java)!!, - lastPrice = row.get("lastPrice", BigDecimal::class.java)?.toDouble(), - ) - }.first() + .map { row, _ -> NftMetadataResponse.fromRow(row) } + .first() } override fun findAllByNftJoinMetadata(ids: List): Flux { @@ -63,27 +51,18 @@ class NftRepositorySupportImpl( m.image AS image FROM nft n + LEFT JOIN + collection c ON n.collection_name = c.name LEFT JOIN metadata m ON n.id = m.nft_id - LEFT JOIN nft_listing nl ON nft.id = nl.nft_id + LEFT JOIN + nft_listing nl ON n.id = nl.nft_id WHERE n.id IN (:$1) """ return r2dbcEntityTemplate.databaseClient.sql(query) .bind(0, ids) - .map { row, metadata -> - NftMetadataResponse( - id = (row.get("id") as Number).toLong(), - tokenId = row.get("tokenId", String::class.java)!!, - tokenAddress = row.get("tokenAddress", String::class.java)!!, - chainType = row.get("chainType", ChainType::class.java)!!, - nftName = row.get("nftName", String::class.java)!!, - collectionName = row.get("collectionName", String::class.java)!!, - image = row.get("image", String::class.java) ?: "", - contractType = row.get("contractType", ContractType::class.java)!!, - lastPrice = row.get("lastPrice", BigDecimal::class.java)?.toDouble(), - ) - } + .map { row, _ -> NftMetadataResponse.fromRow(row) } .all() } diff --git a/src/main/kotlin/com/api/nft/enums/Enums.kt b/src/main/kotlin/com/api/nft/enums/Enums.kt index 7dde924..39df512 100644 --- a/src/main/kotlin/com/api/nft/enums/Enums.kt +++ b/src/main/kotlin/com/api/nft/enums/Enums.kt @@ -11,6 +11,7 @@ enum class ChainType{ } +enum class OrderType { LISTING, AUCTION } enum class NetworkType{ ETHEREUM, @@ -25,4 +26,4 @@ enum class TokenType { SAND, MATIC, ETH, BTC } -enum class StatusType { RESERVATION, ACTIVED, RESERVATION_CANCEL, CANCEL, EXPIRED,LISTING, AUCTION } \ No newline at end of file +enum class StatusType { RESERVATION, ACTIVED, RESERVATION_CANCEL, CANCEL, EXPIRED,LISTING, AUCTION,LEDGER } diff --git a/src/main/kotlin/com/api/nft/kafka/KafkaConsumer.kt b/src/main/kotlin/com/api/nft/kafka/KafkaConsumer.kt new file mode 100644 index 0000000..5f540fe --- /dev/null +++ b/src/main/kotlin/com/api/nft/kafka/KafkaConsumer.kt @@ -0,0 +1,37 @@ +package com.api.nft.kafka + +import com.api.nft.enums.OrderType +import com.api.nft.kafka.dto.SaleResponse +import com.api.nft.service.api.NftAuctionService +import com.api.nft.service.api.NftListingService +import com.fasterxml.jackson.databind.ObjectMapper +import org.springframework.kafka.annotation.KafkaListener +import org.springframework.messaging.Message +import org.springframework.stereotype.Service + +@Service +class KafkaConsumer( + private val objectMapper: ObjectMapper, + private val nftListingService: NftListingService, + private val nftAuctionService: NftAuctionService +) { + @KafkaListener(topics = ["sale-topic"], + groupId = "nft-group", + containerFactory = "kafkaListenerContainerFactory" + ) + fun consumeLedgerStatusEvents(message: Message) { + val payload = message.payload + + if (payload is LinkedHashMap<*, *>) { + val saleStatusRequest = objectMapper.convertValue(payload, SaleResponse::class.java) + println("saleStatusRequest : " + saleStatusRequest) + when(saleStatusRequest.orderType){ + OrderType.LISTING -> nftListingService.update(saleStatusRequest).subscribe() + OrderType.AUCTION -> nftAuctionService.update(saleStatusRequest).subscribe() + } + + // orderService.updateOrderStatus(ledgerStatusRequest).subscribe() + } + } + +} \ No newline at end of file diff --git a/src/main/kotlin/com/api/nft/kafka/dto/SaleResponse.kt b/src/main/kotlin/com/api/nft/kafka/dto/SaleResponse.kt new file mode 100644 index 0000000..409d246 --- /dev/null +++ b/src/main/kotlin/com/api/nft/kafka/dto/SaleResponse.kt @@ -0,0 +1,20 @@ +package com.api.nft.kafka.dto + +import com.api.nft.enums.ChainType +import com.api.nft.enums.StatusType +import com.api.nft.enums.OrderType +import java.math.BigDecimal + +data class SaleResponse( + val id : Long, + val nftId : Long, + val address: String, + val createdDateTime: Long, + val endDateTime: Long, + val statusType: StatusType, + val price: BigDecimal, + val chainType: ChainType, + val orderType: OrderType +) + + diff --git a/src/main/kotlin/com/api/nft/rabbitMQ/RabbitMQReceiver.kt b/src/main/kotlin/com/api/nft/rabbitMQ/RabbitMQReceiver.kt deleted file mode 100644 index 3dc84bf..0000000 --- a/src/main/kotlin/com/api/nft/rabbitMQ/RabbitMQReceiver.kt +++ /dev/null @@ -1,36 +0,0 @@ -package com.api.nft.rabbitMQ - -import com.api.nft.service.api.NftAuctionService -import com.api.nft.service.api.NftListingService -import com.api.nft.service.dto.ListingResponse -import org.springframework.amqp.core.ExchangeTypes -import org.springframework.amqp.rabbit.annotation.Exchange -import org.springframework.amqp.rabbit.annotation.Queue -import org.springframework.amqp.rabbit.annotation.QueueBinding -import org.springframework.amqp.rabbit.annotation.RabbitListener -import org.springframework.stereotype.Service -import com.api.nft.service.dto.AuctionResponse as AuctionResponse - -@Service -class RabbitMQReceiver( - private val nftListingService: NftListingService, - private val nftAuctionService: NftAuctionService -) { - - - @RabbitListener(bindings = [QueueBinding( - value = Queue(name = "", durable = "false", exclusive = "true", autoDelete = "true"), - exchange = Exchange(value = "listingExchange", type = ExchangeTypes.FANOUT) - )]) - fun listingMessage(listing: ListingResponse){ - nftListingService.update(listing).subscribe() - } - - @RabbitListener(bindings = [QueueBinding( - value = Queue(name = "", durable = "false", exclusive = "true", autoDelete = "true"), - exchange = Exchange(value = "auctionExchange", type = ExchangeTypes.FANOUT) - )]) - fun auctionMessage(auction: AuctionResponse){ - nftAuctionService.update(auction).subscribe() - } -} \ No newline at end of file diff --git a/src/main/kotlin/com/api/nft/service/api/NftAuctionService.kt b/src/main/kotlin/com/api/nft/service/api/NftAuctionService.kt index 78dae6d..8fee8cb 100644 --- a/src/main/kotlin/com/api/nft/service/api/NftAuctionService.kt +++ b/src/main/kotlin/com/api/nft/service/api/NftAuctionService.kt @@ -3,6 +3,7 @@ package com.api.nft.service.api import com.api.nft.domain.nft.NftAuction import com.api.nft.domain.nft.repository.NftAuctionRepository import com.api.nft.enums.StatusType +import com.api.nft.kafka.dto.SaleResponse import com.api.nft.service.dto.AuctionResponse import org.springframework.stereotype.Service import reactor.core.publisher.Mono @@ -12,7 +13,7 @@ class NftAuctionService( private val nftAuctionRepository: NftAuctionRepository, ) { - fun update(newAuction: AuctionResponse): Mono { + fun update(newAuction: SaleResponse): Mono { return when (newAuction.statusType) { StatusType.RESERVATION -> { save(newAuction) @@ -23,7 +24,7 @@ class NftAuctionService( nftAuctionRepository.updateAuction(nftId = newAuction.nftId, statusType = StatusType.AUCTION) } } - StatusType.RESERVATION_CANCEL, StatusType.CANCEL, StatusType.EXPIRED -> { + StatusType.RESERVATION_CANCEL, StatusType.CANCEL, StatusType.EXPIRED, StatusType.LEDGER -> { nftAuctionRepository.findByNftId(newAuction.nftId) .flatMap { nftAuction -> nftAuctionRepository.deleteByNftId(nftAuction.nftId) @@ -33,12 +34,12 @@ class NftAuctionService( else -> Mono.empty() } } - fun save(auction: AuctionResponse) : Mono { + fun save(auction: SaleResponse) : Mono { return nftAuctionRepository.save( NftAuction( id = auction.id, nftId = auction.nftId, - startingPrice = auction.startingPrice, + startingPrice = auction.price, chainType = auction.chainType, statusType = auction.statusType, createdDate = auction.createdDateTime, diff --git a/src/main/kotlin/com/api/nft/service/api/NftListingService.kt b/src/main/kotlin/com/api/nft/service/api/NftListingService.kt index d40058c..0850702 100644 --- a/src/main/kotlin/com/api/nft/service/api/NftListingService.kt +++ b/src/main/kotlin/com/api/nft/service/api/NftListingService.kt @@ -3,6 +3,7 @@ package com.api.nft.service.api import com.api.nft.domain.nft.NftListing import com.api.nft.domain.nft.repository.NftListingRepository import com.api.nft.enums.StatusType +import com.api.nft.kafka.dto.SaleResponse import com.api.nft.service.RedisService import com.api.nft.service.dto.ListingResponse import org.springframework.stereotype.Service @@ -14,7 +15,7 @@ class NftListingService( private val redisService: RedisService, ) { - fun update(newListing: ListingResponse): Mono { + fun update(newListing: SaleResponse): Mono { return when (newListing.statusType) { StatusType.RESERVATION -> { save(newListing) @@ -27,7 +28,7 @@ class NftListingService( } .then(redisService.updateToRedis(newListing.nftId)) } - StatusType.RESERVATION_CANCEL, StatusType.CANCEL, StatusType.EXPIRED -> { + StatusType.RESERVATION_CANCEL, StatusType.CANCEL, StatusType.EXPIRED, StatusType.LEDGER -> { nftListingRepository.findByNftId(newListing.nftId) .flatMap { nftListing -> nftListingRepository.deleteByNftId(nftListing.nftId) @@ -37,7 +38,7 @@ class NftListingService( else -> Mono.empty() } } - fun save(listing: ListingResponse) : Mono { + fun save(listing: SaleResponse) : Mono { return nftListingRepository.save( NftListing( id = listing.id, diff --git a/src/main/kotlin/com/api/nft/service/external/moralis/MoralisApiService.kt b/src/main/kotlin/com/api/nft/service/external/moralis/MoralisApiService.kt index 605e570..41ef570 100644 --- a/src/main/kotlin/com/api/nft/service/external/moralis/MoralisApiService.kt +++ b/src/main/kotlin/com/api/nft/service/external/moralis/MoralisApiService.kt @@ -27,7 +27,7 @@ class MoralisApiService( ChainType.LINEA_SEPOLIA -> "0xe705" ChainType.ETHEREUM_HOLESKY -> "0x4268" ChainType.ETHEREUM_SEPOLIA -> "0xaa36a7" - ChainType.POLYGON_AMOY -> "0xaa36a7" + ChainType.POLYGON_AMOY -> "0x13882" } return chain } diff --git a/src/main/resources/db/postgresql/migration/V1__Initial_schema.sql b/src/main/resources/db/postgresql/migration/V1__Initial_schema.sql index bb8cbbc..c685dc5 100644 --- a/src/main/resources/db/postgresql/migration/V1__Initial_schema.sql +++ b/src/main/resources/db/postgresql/migration/V1__Initial_schema.sql @@ -24,7 +24,8 @@ CREATE TYPE token_type AS ENUM ( CREATE TYPE status_type AS ENUM ( 'RESERVATION', 'LISTING', - 'AUCTION' + 'AUCTION', + 'LEDGER' ); diff --git a/src/test/kotlin/com/api/nft/NftTest.kt b/src/test/kotlin/com/api/nft/NftTest.kt index f080130..28bc5c2 100644 --- a/src/test/kotlin/com/api/nft/NftTest.kt +++ b/src/test/kotlin/com/api/nft/NftTest.kt @@ -22,10 +22,12 @@ import org.junit.jupiter.api.Test import org.springframework.beans.factory.annotation.Autowired import org.springframework.boot.test.context.SpringBootTest import org.springframework.context.ApplicationEventPublisher +import org.springframework.test.context.ActiveProfiles import reactor.test.StepVerifier import java.math.BigDecimal @SpringBootTest +@ActiveProfiles("local") class NftTest( @Autowired private val moralisApiService: MoralisApiService, @Autowired private val nftService: NftService, @@ -169,4 +171,10 @@ class NftTest( println(res.toString()) } + @Test + fun test1() { + val nft = nftRepository.findByNftJoinMetadata(10L).block() + println(nft.toString()) + } + } \ No newline at end of file