Skip to content

Commit

Permalink
feat: Threading Improvements for Query Calls (#202)
Browse files Browse the repository at this point in the history
* clean up query threading specifically around listing conversations

* fix up the unit tests

* feat: fix up all the tests to thread

* fix up the example app
  • Loading branch information
nplasterer authored Mar 21, 2024
1 parent 49e94db commit 80559d7
Show file tree
Hide file tree
Showing 23 changed files with 277 additions and 267 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.mapLatest
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.xmtp.android.example.extension.flowWhileShared
import org.xmtp.android.example.extension.stateFlow
import org.xmtp.android.example.pushnotifications.PushNotificationTokenManager
Expand Down Expand Up @@ -70,7 +71,7 @@ class MainViewModel : ViewModel() {

@WorkerThread
private fun fetchMostRecentMessage(conversation: Conversation): DecodedMessage? {
return conversation.messages(limit = 1).firstOrNull()
return runBlocking { conversation.messages(limit = 1).firstOrNull() }
}

@OptIn(ExperimentalCoroutinesApi::class)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ import kotlinx.coroutines.flow.emptyFlow
import kotlinx.coroutines.flow.flowOn
import kotlinx.coroutines.flow.mapLatest
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.xmtp.android.example.ClientManager
import org.xmtp.android.example.extension.flowWhileShared
import org.xmtp.android.example.extension.stateFlow
Expand Down Expand Up @@ -77,7 +78,12 @@ class ConversationDetailViewModel(private val savedStateHandle: SavedStateHandle
stateFlow(viewModelScope, null) { subscriptionCount ->
if (conversation == null) {
conversation =
ClientManager.client.fetchConversation(conversationTopic, includeGroups = false)
runBlocking {
ClientManager.client.fetchConversation(
conversationTopic,
includeGroups = false
)
}
}
if (conversation != null) {
conversation!!.streamMessages()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import com.google.firebase.messaging.RemoteMessage
import kotlinx.coroutines.Dispatchers
import kotlinx.coroutines.GlobalScope
import kotlinx.coroutines.launch
import kotlinx.coroutines.runBlocking
import org.xmtp.android.example.ClientManager
import org.xmtp.android.example.R
import org.xmtp.android.example.conversation.ConversationDetailActivity
Expand Down Expand Up @@ -56,7 +57,8 @@ class PushNotificationsService : FirebaseMessagingService() {
GlobalScope.launch(Dispatchers.Main) {
ClientManager.createClient(keysData, applicationContext)
}
val conversation = ClientManager.client.fetchConversation(topic, includeGroups = true)
val conversation =
runBlocking { ClientManager.client.fetchConversation(topic, includeGroups = true) }
if (conversation == null) {
Log.e(TAG, "No keys or conversation persisted")
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ class AttachmentTest {
options = SendOptions(contentType = ContentTypeAttachment),
)
}
val messages = aliceConversation.messages()
val messages = runBlocking { aliceConversation.messages() }
assertEquals(messages.size, 1)
if (messages.size == 1) {
val content: Attachment? = messages[0].content()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -148,11 +148,12 @@ class ClientTest {
appContext = context
)
)

runBlocking {
client.conversations.newGroup(listOf(client2.address))
client.conversations.syncGroups()
assertEquals(client.conversations.listGroups().size, 1)
}
assertEquals(client.conversations.listGroups().size, 1)

client.deleteLocalDatabase()

Expand All @@ -166,8 +167,10 @@ class ClientTest {
)
)

runBlocking { client.conversations.syncGroups() }
assertEquals(client.conversations.listGroups().size, 0)
runBlocking {
client.conversations.syncGroups()
assertEquals(client.conversations.listGroups().size, 0)
}
}

@Test
Expand Down Expand Up @@ -209,7 +212,7 @@ class ClientTest {
val notOnNetwork = PrivateKeyBuilder()
val opts = ClientOptions(ClientOptions.Api(XMTPEnvironment.LOCAL, false))
val aliceClient = Client().create(aliceWallet, opts)
aliceClient.ensureUserContactPublished()
runBlocking { aliceClient.ensureUserContactPublished() }

val canMessage = Client.canMessage(aliceWallet.address, opts)
val cannotMessage = Client.canMessage(notOnNetwork.address, opts)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -68,7 +68,7 @@ class CodecTest {
options = SendOptions(contentType = NumberCodec().contentType),
)
}
val messages = aliceConversation.messages()
val messages = runBlocking { aliceConversation.messages() }
assertEquals(messages.size, 1)
if (messages.size == 1) {
val content: Double? = messages[0].content()
Expand All @@ -93,7 +93,7 @@ class CodecTest {
options = SendOptions(contentType = CompositeCodec().contentType),
)
}
val messages = aliceConversation.messages()
val messages = runBlocking { aliceConversation.messages() }
val decoded: DecodedComposite? = messages[0].content()
assertEquals("hiya", decoded?.content())
}
Expand Down Expand Up @@ -121,7 +121,7 @@ class CodecTest {
options = SendOptions(contentType = CompositeCodec().contentType),
)
}
val messages = aliceConversation.messages()
val messages = runBlocking { aliceConversation.messages() }
val decoded: DecodedComposite? = messages[0].content()
val part1 = decoded!!.parts[0]
val part2 = decoded.parts[1].parts[0]
Expand All @@ -144,7 +144,7 @@ class CodecTest {
options = SendOptions(contentType = codec.contentType),
)
}
val messages = aliceConversation.messages()
val messages = runBlocking { aliceConversation.messages() }
assert(messages.isNotEmpty())

val message = MessageV2Builder.buildEncode(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package org.xmtp.android.library

import androidx.test.ext.junit.runners.AndroidJUnit4
import kotlinx.coroutines.runBlocking
import org.junit.Assert.assertEquals
import org.junit.Test
import org.junit.runner.RunWith
Expand All @@ -12,7 +13,7 @@ class ContactsTest {
@Test
fun testNormalizesAddresses() {
val fixtures = fixtures()
fixtures.bobClient.ensureUserContactPublished()
runBlocking { fixtures.bobClient.ensureUserContactPublished() }
val bobAddressLowerCased = fixtures.bobClient.address.lowercase()
val bobContact = fixtures.aliceClient.getUserContact(peerAddress = bobAddressLowerCased)
assert(bobContact != null)
Expand All @@ -21,15 +22,15 @@ class ContactsTest {
@Test
fun testCanFindContact() {
val fixtures = fixtures()
fixtures.bobClient.ensureUserContactPublished()
runBlocking { fixtures.bobClient.ensureUserContactPublished() }
val contactBundle = fixtures.aliceClient.contacts.find(fixtures.bob.walletAddress)
assertEquals(contactBundle?.walletAddress, fixtures.bob.walletAddress)
}

@Test
fun testCachesContacts() {
val fixtures = fixtures()
fixtures.bobClient.ensureUserContactPublished()
runBlocking { fixtures.bobClient.ensureUserContactPublished() }
// Look up the first time
fixtures.aliceClient.contacts.find(fixtures.bob.walletAddress)
fixtures.fakeApiClient.assertNoQuery {
Expand All @@ -48,7 +49,7 @@ class ContactsTest {

assert(!result)

contacts.allow(listOf(fixtures.alice.walletAddress))
runBlocking { contacts.allow(listOf(fixtures.alice.walletAddress)) }

result = contacts.isAllowed(fixtures.alice.walletAddress)
assert(result)
Expand All @@ -63,7 +64,7 @@ class ContactsTest {

assert(!result)

contacts.deny(listOf(fixtures.alice.walletAddress))
runBlocking { contacts.deny(listOf(fixtures.alice.walletAddress)) }

result = contacts.isDenied(fixtures.alice.walletAddress)
assert(result)
Expand Down
Loading

0 comments on commit 80559d7

Please sign in to comment.