Skip to content

Commit

Permalink
Fetch MAM pages when scrolling
Browse files Browse the repository at this point in the history
  • Loading branch information
kkonsw committed Mar 18, 2024
1 parent ecb3fb0 commit e02f2c9
Show file tree
Hide file tree
Showing 6 changed files with 94 additions and 15 deletions.
20 changes: 18 additions & 2 deletions libdino/src/service/content_item_store.vala
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ using Xmpp;

namespace Dino {

const int HISTORY_SYNC_MAM_PAGES = 10;

public class ContentItemStore : StreamInteractionModule, Object {
public static ModuleIdentity<ContentItemStore> IDENTITY = new ModuleIdentity<ContentItemStore>("content_item_store");
public string id { get { return IDENTITY.id; } }

public signal void new_item(ContentItem item, Conversation conversation);
public signal void history_loaded(Conversation conversation, ContentItem item, int count);

private StreamInteractor stream_interactor;
private Database db;
Expand Down Expand Up @@ -241,8 +244,10 @@ public class ContentItemStore : StreamInteractionModule, Object {
// return ret;
// }

public Gee.List<ContentItem> get_before(Conversation conversation, ContentItem item, int count) {
public Gee.List<ContentItem> get_before(Conversation conversation, ContentItem item, int count, bool request_from_server = true) {
debug("Fetching earlier messages from the db");
long time = (long) item.time.to_unix();

QueryBuilder select = db.content_item.select()
.where(@"time < ? OR (time = ? AND id < ?)", { time.to_string(), time.to_string(), item.id.to_string() })
.with(db.content_item.conversation_id, "=", conversation.id)
Expand All @@ -251,7 +256,18 @@ public class ContentItemStore : StreamInteractionModule, Object {
.order_by(db.content_item.id, "DESC")
.limit(count);

return get_items_from_query(select, conversation);
var items = get_items_from_query(select, conversation);
if (items.size == 0 && request_from_server) {
// Async request to get earlier messages from the server
var history_sync = stream_interactor.get_module(MessageProcessor.IDENTITY).history_sync;
history_sync.fetch_data.begin(conversation.account, conversation.counterpart.bare_jid, item.time, HISTORY_SYNC_MAM_PAGES, (_, res) => {
history_sync.fetch_data.end(res);
debug("History loaded");
history_loaded(conversation, item, count);
});
}

return items;
}

public Gee.List<ContentItem> get_after(Conversation conversation, ContentItem item, int count) {
Expand Down
30 changes: 30 additions & 0 deletions libdino/src/service/history_sync.vala
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,36 @@ public class Dino.HistorySync {
}
}

private async PageRequestResult fetch_pages(Account account, Xmpp.MessageArchiveManagement.V2.MamQueryParams query_params, int pages) {
debug("[%s | %s] Fetch query %s - %s", account.bare_jid.to_string(), query_params.mam_server.to_string(), query_params.start != null ? query_params.start.to_string() : "", query_params.end != null ? query_params.end.to_string() : "");
PageRequestResult? page_result = null;

int processed_pages = 0;
do {
page_result = yield get_mam_page(account, query_params, page_result, null);
processed_pages++;

debug("[%s | %s] Page result %s (got stanzas: %s)", account.bare_jid.to_string(), query_params.mam_server.to_string(), page_result.page_result.to_string(), (page_result.stanzas != null).to_string());
if (processed_pages == pages) {
break;
}

if (page_result.page_result == PageResult.Error || page_result.page_result == PageResult.Cancelled || page_result.query_result.first == null) {
return page_result;
}

} while (page_result.page_result == PageResult.MorePagesAvailable);

return page_result;
}

public async void fetch_data(Account account, Jid target, DateTime latest, int pages) {
debug("Fetch history for %s", target.to_string());

var query_params = new Xmpp.MessageArchiveManagement.V2.MamQueryParams.query_before(target, latest, null);
yield fetch_pages(account, query_params, pages);
}

public async void fetch_history(Account account, Jid target, Cancellable? cancellable = null) {
debug("Fetch history for %s", target.to_string());

Expand Down
27 changes: 23 additions & 4 deletions libdino/src/service/message_processor.vala
Original file line number Diff line number Diff line change
Expand Up @@ -134,10 +134,14 @@ public class MessageProcessor : StreamInteractionModule, Object {
Entities.Message message = yield parse_message_stanza(account, message_stanza);

Conversation? conversation = stream_interactor.get_module(ConversationManager.IDENTITY).get_conversation_for_message(message);
if (conversation == null) return;
if (conversation == null) {
return;
}

bool abort = yield received_pipeline.run(message, message_stanza, conversation);
if (abort) return;
if (abort) {
return;
}

if (message.direction == Entities.Message.DIRECTION_RECEIVED) {
message_received(message, conversation);
Expand Down Expand Up @@ -245,6 +249,7 @@ public class MessageProcessor : StreamInteractionModule, Object {

// If the message is a duplicate
if (builder.count() > 0) {
warning("deduplicate by server id");
history_sync.on_server_id_duplicate(account, stanza, message);
return true;
}
Expand All @@ -271,6 +276,11 @@ public class MessageProcessor : StreamInteractionModule, Object {
}
}
bool duplicate = builder.single().row().is_present();

if (duplicate) {
warning("deduplicate by uuid");
}

return duplicate;
}

Expand All @@ -291,7 +301,13 @@ public class MessageProcessor : StreamInteractionModule, Object {
} else {
builder.with_null(db.message.counterpart_resource);
}
return builder.count() > 0;

bool duplicate = builder.count() > 0;
if (duplicate) {
warning("deduplicate by content and metadata");
}

return duplicate;
}

private class DeduplicateMessageListener : MessageListener {
Expand Down Expand Up @@ -357,7 +373,10 @@ public class MessageProcessor : StreamInteractionModule, Object {
}

public override async bool run(Entities.Message message, Xmpp.MessageStanza stanza, Conversation conversation) {
if (message.body == null) return true;
if (message.body == null) {
return true;
}

stream_interactor.get_module(ContentItemStore.IDENTITY).insert_message(message, conversation);
return false;
}
Expand Down
4 changes: 2 additions & 2 deletions libdino/src/service/muc_manager.vala
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,8 @@ public class MucManager : StreamInteractionModule, Object {
if (can_do_mam) {
var history_sync = stream_interactor.get_module(MessageProcessor.IDENTITY).history_sync;
if (conversation == null) {
// We never joined the conversation before, just fetch the latest MAM page
yield history_sync.fetch_latest_page(account, jid.bare_jid, null, new DateTime.from_unix_utc(0), cancellable);
// We never joined the conversation before, fetch latest MAM pages
yield history_sync.fetch_data(account, jid.bare_jid, new DateTime.now(), 10);
} else {
// Fetch everything up to the last time the user actively joined
if (!mucs_sync_cancellables.has_key(account)) {
Expand Down
4 changes: 2 additions & 2 deletions main/src/ui/conversation_content_view/content_populator.vala
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ public class ContentProvider : ContentItemCollection, Object {
return ret;
}

public Gee.List<ContentMetaItem> populate_before(Conversation conversation, ContentItem before_item, int n) {
public Gee.List<ContentMetaItem> populate_before(Conversation conversation, ContentItem before_item, int n, bool request_from_server = true) {
Gee.List<ContentMetaItem> ret = new ArrayList<ContentMetaItem>();
Gee.List<ContentItem> items = stream_interactor.get_module(ContentItemStore.IDENTITY).get_before(conversation, before_item, n);
Gee.List<ContentItem> items = stream_interactor.get_module(ContentItemStore.IDENTITY).get_before(conversation, before_item, n, request_from_server);
foreach (ContentItem item in items) {
ret.add(create_content_meta_item(item));
}
Expand Down
24 changes: 19 additions & 5 deletions main/src/ui/conversation_content_view/conversation_view.vala
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,12 @@ public class ConversationView : Widget, Plugins.ConversationItemCollection, Plug
ContentMetaItem? current_meta_item = null;
double last_y = -1;

private void on_history_loaded(Conversation conversation, ContentItem item, int count) {
// We received new messages from the server
// Load them from the DB, but do not make new request to the server
load_earlier_messages(false);
}

construct {
this.layout_manager = new BinLayout();

Expand Down Expand Up @@ -78,6 +84,9 @@ public class ConversationView : Widget, Plugins.ConversationItemCollection, Plug
scrolled.vadjustment.notify["page-size"].connect(on_upper_notify);
scrolled.vadjustment.notify["value"].connect(on_value_notify);

var content_item_store = stream_interactor.get_module(ContentItemStore.IDENTITY);
content_item_store.history_loaded.connect(on_history_loaded);

content_populator = new ContentProvider(stream_interactor);
subscription_notification = new SubscriptionNotitication(stream_interactor);

Expand Down Expand Up @@ -552,17 +561,22 @@ public class ConversationView : Widget, Plugins.ConversationItemCollection, Plug
}
}

private void load_earlier_messages() {
private void load_earlier_messages(bool request_from_server = true) {
was_value = scrolled.vadjustment.value;
if (!reloading_mutex.trylock()) return;
debug("loading earlier messages");
if (!reloading_mutex.trylock()) {
return;
}

if (content_items.size > 0) {
Gee.List<ContentMetaItem> items = content_populator.populate_before(conversation, ((ContentMetaItem) content_items.first()).content_item, 20);
Gee.List<ContentMetaItem> items = content_populator.populate_before(conversation, ((ContentMetaItem) content_items.first()).content_item, 20, request_from_server);
debug("inserting new messages, size: %d", items.size);
foreach (ContentMetaItem item in items) {
do_insert_item(item);
}
} else {
reloading_mutex.unlock();
}

reloading_mutex.unlock();
}

private void load_later_messages() {
Expand Down

0 comments on commit e02f2c9

Please sign in to comment.