Skip to content

Commit

Permalink
Merge pull request #23 from NTF-marketplace/develop
Browse files Browse the repository at this point in the history
Develop
  • Loading branch information
min-96 authored Sep 24, 2024
2 parents 101e4d3 + 27ffe01 commit 16bef4b
Show file tree
Hide file tree
Showing 16 changed files with 190 additions and 88 deletions.
1 change: 1 addition & 0 deletions build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ dependencies {
implementation("org.springframework.boot:spring-boot-starter-batch")
implementation("org.springframework.boot:spring-boot-starter-data-r2dbc")
implementation("org.postgresql:r2dbc-postgresql:1.0.4.RELEASE")
implementation("org.springframework.kafka:spring-kafka")
implementation("org.postgresql:postgresql:42.7.3")

implementation("org.springframework.boot:spring-boot-starter-web")
Expand Down
66 changes: 66 additions & 0 deletions src/main/kotlin/com/api/nft/config/KafkaConfig.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,66 @@
package com.api.nft.config

import org.apache.kafka.clients.admin.AdminClientConfig
import org.apache.kafka.clients.admin.NewTopic
import org.apache.kafka.clients.consumer.ConsumerConfig
import org.apache.kafka.common.serialization.StringDeserializer
import org.slf4j.LoggerFactory
import org.springframework.beans.factory.annotation.Value
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.kafka.annotation.EnableKafka
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory
import org.springframework.kafka.config.TopicBuilder
import org.springframework.kafka.core.ConsumerFactory
import org.springframework.kafka.core.DefaultKafkaConsumerFactory
import org.springframework.kafka.core.KafkaAdmin
import org.springframework.kafka.listener.CommonErrorHandler
import org.springframework.kafka.listener.MessageListenerContainer
import org.springframework.kafka.support.serializer.JsonDeserializer

@Configuration
@EnableKafka
class KafkaConfig {
private val logger = LoggerFactory.getLogger(KafkaConfig::class.java)

@Value("\${spring.kafka.bootstrap-servers}")
private lateinit var bootstrapServers: String

@Bean
fun kafkaAdmin(): KafkaAdmin = KafkaAdmin(mapOf(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers))

@Bean
fun consumerFactory(): ConsumerFactory<String, Any> {
val props =
mapOf(
ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG to bootstrapServers,
ConsumerConfig.GROUP_ID_CONFIG to "nft-group",
ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG to StringDeserializer::class.java.name,
ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG to JsonDeserializer::class.java.name,
JsonDeserializer.TRUSTED_PACKAGES to "*",
JsonDeserializer.VALUE_DEFAULT_TYPE to Any::class.java.name,
)
return DefaultKafkaConsumerFactory(props, StringDeserializer(), JsonDeserializer(Any::class.java, false))
}

@Bean
fun kafkaListenerContainerFactory(): ConcurrentKafkaListenerContainerFactory<String, Any> {
val factory = ConcurrentKafkaListenerContainerFactory<String, Any>()
factory.consumerFactory = consumerFactory()
factory.setConcurrency(4)
factory.setCommonErrorHandler(
object : CommonErrorHandler {
override fun handleRemaining(
thrownException: Exception,
records: List<org.apache.kafka.clients.consumer.ConsumerRecord<*, *>>,
consumer: org.apache.kafka.clients.consumer.Consumer<*, *>,
container: MessageListenerContainer,
) {
logger.error("Error in consumer: ${thrownException.message}", thrownException)
logger.error("Problematic records: $records")
}
},
)
return factory
}
}
8 changes: 0 additions & 8 deletions src/main/kotlin/com/api/nft/config/RabbitConfig.kt
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,4 @@ class RabbitConfig {

@Bean
fun nftExchange() = createFanoutExchange("nftExchange")


@Bean
fun listingExchange() = createFanoutExchange("listingExchange")


@Bean
fun auctionExchange() = createFanoutExchange("auctionExchange")
}
5 changes: 4 additions & 1 deletion src/main/kotlin/com/api/nft/config/RedisConfig.kt
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
package com.api.nft.config

import com.fasterxml.jackson.databind.ObjectMapper
import com.fasterxml.jackson.databind.jsontype.BasicPolymorphicTypeValidator
import com.fasterxml.jackson.module.kotlin.KotlinModule
import org.springframework.context.annotation.Bean
import org.springframework.context.annotation.Configuration
import org.springframework.data.redis.connection.RedisClusterConfiguration
Expand Down Expand Up @@ -43,7 +46,7 @@ class RedisConfig {
.autoReconnect(true)
.pingBeforeActivateConnection(true)
.build()
)
)
.build()
redisClusterConfiguration.password = RedisPassword.of("bitnami")
redisClusterConfiguration.maxRedirects = 3
Expand Down
6 changes: 6 additions & 0 deletions src/main/kotlin/com/api/nft/controller/NftController.kt
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,12 @@ class NftController(
return nftService.getByWalletNft(wallet,chainType)

}
// collection을 구현할건데 1h / 7h /12h / 24h / 7d/ 30d
// Listing은 matched volume 이 높은순이여야돼



// aution은 현재 acution중인것


}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package com.api.nft.controller.dto

import com.api.nft.enums.ChainType
import com.api.nft.enums.ContractType
import io.r2dbc.spi.Row
import java.math.BigDecimal


data class NftMetadataResponse(
Expand All @@ -14,4 +16,24 @@ data class NftMetadataResponse(
val collectionName: String,
val image: String,
val lastPrice: Double?,
)
val collectionLogo: String?
){
companion object {
fun fromRow(row: Row): NftMetadataResponse {
val idValue = row.get("id")
println("ID value type: ${idValue?.javaClass}, value: $idValue")
return NftMetadataResponse(
id = row.get("id", Integer::class.java)!!.toLong(),
tokenId = row.get("tokenid", String::class.java)!!,
tokenAddress = row.get("tokenaddress", String::class.java)!!,
chainType = ChainType.valueOf(row.get("chaintype", String::class.java)!!),
nftName = row.get("nftname", String::class.java)!!,
collectionName = row.get("collectionname", String::class.java)!!,
image = row.get("image", String::class.java) ?: "",
contractType = ContractType.valueOf(row.get("contracttype", String::class.java)!!),
lastPrice = row.get("lastprice", BigDecimal::class.java)?.toDouble(),
collectionLogo = row.get("collectionlogo", String::class.java) ?: ""
)
}
}
}
Original file line number Diff line number Diff line change
@@ -1,12 +1,9 @@
package com.api.nft.domain.nft.repository

import com.api.nft.controller.dto.NftMetadataResponse
import com.api.nft.enums.ChainType
import com.api.nft.enums.ContractType
import org.springframework.data.r2dbc.core.R2dbcEntityTemplate
import reactor.core.publisher.Flux
import reactor.core.publisher.Mono
import java.math.BigDecimal

class NftRepositorySupportImpl(
private val r2dbcEntityTemplate: R2dbcEntityTemplate
Expand All @@ -22,32 +19,23 @@ class NftRepositorySupportImpl(
n.collection_name AS collectionName,
n.contract_type AS contractType,
m.image AS image,
nl.price AS lastPrice
nl.price AS lastPrice,
c.logo AS collectionLogo
FROM
nft n
LEFT JOIN
collection c ON n.collection_name = c.name
LEFT JOIN
metadata m ON n.id = m.nft_id
LEFT JOIN
nft_listing nl ON n.id = nl.nft_id
WHERE
n.id = :$1
"""

return r2dbcEntityTemplate.databaseClient.sql(query)
.bind(0, id)
.map { row, data ->
NftMetadataResponse(
id = (row.get("id") as Number).toLong(),
tokenId = row.get("tokenId", String::class.java)!!,
tokenAddress = row.get("tokenAddress", String::class.java)!!,
chainType = row.get("chainType", ChainType::class.java)!!,
nftName = row.get("nftName", String::class.java)!!,
collectionName = row.get("collectionName", String::class.java)!!,
image = row.get("image", String::class.java) ?: "",
contractType = row.get("contractType", ContractType::class.java)!!,
lastPrice = row.get("lastPrice", BigDecimal::class.java)?.toDouble(),
)
}.first()
.map { row, _ -> NftMetadataResponse.fromRow(row) }
.first()
}

override fun findAllByNftJoinMetadata(ids: List<Long>): Flux<NftMetadataResponse> {
Expand All @@ -63,27 +51,18 @@ class NftRepositorySupportImpl(
m.image AS image
FROM
nft n
LEFT JOIN
collection c ON n.collection_name = c.name
LEFT JOIN
metadata m ON n.id = m.nft_id
LEFT JOIN nft_listing nl ON nft.id = nl.nft_id
LEFT JOIN
nft_listing nl ON n.id = nl.nft_id
WHERE
n.id IN (:$1)
"""
return r2dbcEntityTemplate.databaseClient.sql(query)
.bind(0, ids)
.map { row, metadata ->
NftMetadataResponse(
id = (row.get("id") as Number).toLong(),
tokenId = row.get("tokenId", String::class.java)!!,
tokenAddress = row.get("tokenAddress", String::class.java)!!,
chainType = row.get("chainType", ChainType::class.java)!!,
nftName = row.get("nftName", String::class.java)!!,
collectionName = row.get("collectionName", String::class.java)!!,
image = row.get("image", String::class.java) ?: "",
contractType = row.get("contractType", ContractType::class.java)!!,
lastPrice = row.get("lastPrice", BigDecimal::class.java)?.toDouble(),
)
}
.map { row, _ -> NftMetadataResponse.fromRow(row) }
.all()
}

Expand Down
3 changes: 2 additions & 1 deletion src/main/kotlin/com/api/nft/enums/Enums.kt
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ enum class ChainType{

}

enum class OrderType { LISTING, AUCTION }

enum class NetworkType{
ETHEREUM,
Expand All @@ -25,4 +26,4 @@ enum class TokenType {
SAND, MATIC, ETH, BTC
}

enum class StatusType { RESERVATION, ACTIVED, RESERVATION_CANCEL, CANCEL, EXPIRED,LISTING, AUCTION }
enum class StatusType { RESERVATION, ACTIVED, RESERVATION_CANCEL, CANCEL, EXPIRED,LISTING, AUCTION,LEDGER }
37 changes: 37 additions & 0 deletions src/main/kotlin/com/api/nft/kafka/KafkaConsumer.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.api.nft.kafka

import com.api.nft.enums.OrderType
import com.api.nft.kafka.dto.SaleResponse
import com.api.nft.service.api.NftAuctionService
import com.api.nft.service.api.NftListingService
import com.fasterxml.jackson.databind.ObjectMapper
import org.springframework.kafka.annotation.KafkaListener
import org.springframework.messaging.Message
import org.springframework.stereotype.Service

@Service
class KafkaConsumer(
private val objectMapper: ObjectMapper,
private val nftListingService: NftListingService,
private val nftAuctionService: NftAuctionService
) {
@KafkaListener(topics = ["sale-topic"],
groupId = "nft-group",
containerFactory = "kafkaListenerContainerFactory"
)
fun consumeLedgerStatusEvents(message: Message<Any>) {
val payload = message.payload

if (payload is LinkedHashMap<*, *>) {
val saleStatusRequest = objectMapper.convertValue(payload, SaleResponse::class.java)
println("saleStatusRequest : " + saleStatusRequest)
when(saleStatusRequest.orderType){
OrderType.LISTING -> nftListingService.update(saleStatusRequest).subscribe()
OrderType.AUCTION -> nftAuctionService.update(saleStatusRequest).subscribe()
}

// orderService.updateOrderStatus(ledgerStatusRequest).subscribe()
}
}

}
20 changes: 20 additions & 0 deletions src/main/kotlin/com/api/nft/kafka/dto/SaleResponse.kt
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
package com.api.nft.kafka.dto

import com.api.nft.enums.ChainType
import com.api.nft.enums.StatusType
import com.api.nft.enums.OrderType
import java.math.BigDecimal

data class SaleResponse(
val id : Long,
val nftId : Long,
val address: String,
val createdDateTime: Long,
val endDateTime: Long,
val statusType: StatusType,
val price: BigDecimal,
val chainType: ChainType,
val orderType: OrderType
)


36 changes: 0 additions & 36 deletions src/main/kotlin/com/api/nft/rabbitMQ/RabbitMQReceiver.kt

This file was deleted.

9 changes: 5 additions & 4 deletions src/main/kotlin/com/api/nft/service/api/NftAuctionService.kt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package com.api.nft.service.api
import com.api.nft.domain.nft.NftAuction
import com.api.nft.domain.nft.repository.NftAuctionRepository
import com.api.nft.enums.StatusType
import com.api.nft.kafka.dto.SaleResponse
import com.api.nft.service.dto.AuctionResponse
import org.springframework.stereotype.Service
import reactor.core.publisher.Mono
Expand All @@ -12,7 +13,7 @@ class NftAuctionService(
private val nftAuctionRepository: NftAuctionRepository,
) {

fun update(newAuction: AuctionResponse): Mono<Void> {
fun update(newAuction: SaleResponse): Mono<Void> {
return when (newAuction.statusType) {
StatusType.RESERVATION -> {
save(newAuction)
Expand All @@ -23,7 +24,7 @@ class NftAuctionService(
nftAuctionRepository.updateAuction(nftId = newAuction.nftId, statusType = StatusType.AUCTION)
}
}
StatusType.RESERVATION_CANCEL, StatusType.CANCEL, StatusType.EXPIRED -> {
StatusType.RESERVATION_CANCEL, StatusType.CANCEL, StatusType.EXPIRED, StatusType.LEDGER -> {
nftAuctionRepository.findByNftId(newAuction.nftId)
.flatMap { nftAuction ->
nftAuctionRepository.deleteByNftId(nftAuction.nftId)
Expand All @@ -33,12 +34,12 @@ class NftAuctionService(
else -> Mono.empty()
}
}
fun save(auction: AuctionResponse) : Mono<Void> {
fun save(auction: SaleResponse) : Mono<Void> {
return nftAuctionRepository.save(
NftAuction(
id = auction.id,
nftId = auction.nftId,
startingPrice = auction.startingPrice,
startingPrice = auction.price,
chainType = auction.chainType,
statusType = auction.statusType,
createdDate = auction.createdDateTime,
Expand Down
Loading

0 comments on commit 16bef4b

Please sign in to comment.