Browse Source

Organise activity receive files by object type, not by activity type

inbox-refactoring-2
Felix Ableitner 4 months ago
parent
commit
1a3b96b054
15 changed files with 995 additions and 1119 deletions
  1. +0
    -51
      lemmy_apub/src/activities/receive/announce.rs
  2. +307
    -0
      lemmy_apub/src/activities/receive/comment.rs
  3. +0
    -0
      lemmy_apub/src/activities/receive/comment_undo.rs
  4. +130
    -0
      lemmy_apub/src/activities/receive/community.rs
  5. +0
    -129
      lemmy_apub/src/activities/receive/create.rs
  6. +0
    -149
      lemmy_apub/src/activities/receive/delete.rs
  7. +0
    -161
      lemmy_apub/src/activities/receive/dislike.rs
  8. +0
    -136
      lemmy_apub/src/activities/receive/like.rs
  9. +6
    -11
      lemmy_apub/src/activities/receive/mod.rs
  10. +242
    -0
      lemmy_apub/src/activities/receive/post.rs
  11. +0
    -0
      lemmy_apub/src/activities/receive/post_undo.rs
  12. +0
    -151
      lemmy_apub/src/activities/receive/remove.rs
  13. +0
    -184
      lemmy_apub/src/activities/receive/undo.rs
  14. +0
    -137
      lemmy_apub/src/activities/receive/update.rs
  15. +310
    -10
      lemmy_apub/src/inbox/shared_inbox.rs

+ 0
- 51
lemmy_apub/src/activities/receive/announce.rs View File

@ -1,51 +0,0 @@
use crate::{
activities::receive::{
create::receive_create,
delete::receive_delete,
dislike::receive_dislike,
like::receive_like,
receive_unhandled_activity,
remove::receive_remove,
undo::receive_undo,
update::receive_update,
verify_activity_domains_valid,
},
check_is_apub_id_valid,
ActorType,
};
use activitystreams::{activity::*, base::AnyBase, prelude::ExtendsExt};
use actix_web::HttpResponse;
use anyhow::Context;
use lemmy_utils::{location_info, LemmyError};
use lemmy_websocket::LemmyContext;
/// Takes an announce and passes the inner activity to the appropriate handler.
pub async fn receive_announce(
context: &LemmyContext,
activity: AnyBase,
actor: &dyn ActorType,
) -> Result<HttpResponse, LemmyError> {
let announce = Announce::from_any_base(activity)?.context(location_info!())?;
verify_activity_domains_valid(&announce, actor.actor_id()?, false)?;
let kind = announce.object().as_single_kind_str();
let object = announce
.object()
.to_owned()
.one()
.context(location_info!())?;
let inner_id = object.id().context(location_info!())?.to_owned();
check_is_apub_id_valid(&inner_id)?;
match kind {
Some("Create") => receive_create(context, object, inner_id).await,
Some("Update") => receive_update(context, object, inner_id).await,
Some("Like") => receive_like(context, object, inner_id).await,
Some("Dislike") => receive_dislike(context, object, inner_id).await,
Some("Delete") => receive_delete(context, object, inner_id).await,
Some("Remove") => receive_remove(context, object, inner_id).await,
Some("Undo") => receive_undo(context, object, inner_id).await,
_ => receive_unhandled_activity(announce),
}
}

+ 307
- 0
lemmy_apub/src/activities/receive/comment.rs View File

@ -0,0 +1,307 @@
use crate::{
activities::receive::{announce_if_community_is_local, get_actor_as_user},
fetcher::get_or_fetch_and_insert_comment,
ActorType,
FromApub,
};
use activitystreams::{
activity::{ActorAndObjectRefExt, Create, Delete, Dislike, Like, Remove, Update},
base::ExtendsExt,
object::Note,
};
use actix_web::HttpResponse;
use anyhow::Context;
use lemmy_db::{
comment::{Comment, CommentForm, CommentLike, CommentLikeForm},
comment_view::CommentView,
post::Post,
Crud,
Likeable,
};
use lemmy_structs::{blocking, comment::CommentResponse, send_local_notifs};
use lemmy_utils::{location_info, utils::scrape_text_for_mentions, LemmyError};
use lemmy_websocket::{messages::SendComment, LemmyContext, UserOperation};
pub(crate) async fn receive_create_comment(
create: Create,
context: &LemmyContext,
) -> Result<HttpResponse, LemmyError> {
let user = get_actor_as_user(&create, context).await?;
let note = Note::from_any_base(create.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?;
let comment = CommentForm::from_apub(&note, context, Some(user.actor_id()?)).await?;
let inserted_comment =
blocking(context.pool(), move |conn| Comment::upsert(conn, &comment)).await??;
let post_id = inserted_comment.post_id;
let post = blocking(context.pool(), move |conn| Post::read(conn, post_id)).await??;
// Note:
// Although mentions could be gotten from the post tags (they are included there), or the ccs,
// Its much easier to scrape them from the comment body, since the API has to do that
// anyway.
let mentions = scrape_text_for_mentions(&inserted_comment.content);
let recipient_ids = send_local_notifs(
mentions,
inserted_comment.clone(),
&user,
post,
context.pool(),
true,
)
.await?;
// Refetch the view
let comment_view = blocking(context.pool(), move |conn| {
CommentView::read(conn, inserted_comment.id, None)
})
.await??;
let res = CommentResponse {
comment: comment_view,
recipient_ids,
form_id: None,
};
context.chat_server().do_send(SendComment {
op: UserOperation::CreateComment,
comment: res,
websocket_id: None,
});
announce_if_community_is_local(create, &user, context).await?;
Ok(HttpResponse::Ok().finish())
}
pub(crate) async fn receive_update_comment(
update: Update,
context: &LemmyContext,
) -> Result<HttpResponse, LemmyError> {
let note = Note::from_any_base(update.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?;
let user = get_actor_as_user(&update, context).await?;
let comment = CommentForm::from_apub(&note, context, Some(user.actor_id()?)).await?;
let original_comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context)
.await?
.id;
let updated_comment = blocking(context.pool(), move |conn| {
Comment::update(conn, original_comment_id, &comment)
})
.await??;
let post_id = updated_comment.post_id;
let post = blocking(context.pool(), move |conn| Post::read(conn, post_id)).await??;
let mentions = scrape_text_for_mentions(&updated_comment.content);
let recipient_ids = send_local_notifs(
mentions,
updated_comment,
&user,
post,
context.pool(),
false,
)
.await?;
// Refetch the view
let comment_view = blocking(context.pool(), move |conn| {
CommentView::read(conn, original_comment_id, None)
})
.await??;
let res = CommentResponse {
comment: comment_view,
recipient_ids,
form_id: None,
};
context.chat_server().do_send(SendComment {
op: UserOperation::EditComment,
comment: res,
websocket_id: None,
});
announce_if_community_is_local(update, &user, context).await?;
Ok(HttpResponse::Ok().finish())
}
pub(crate) async fn receive_like_comment(
like: Like,
context: &LemmyContext,
) -> Result<HttpResponse, LemmyError> {
let note = Note::from_any_base(like.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?;
let user = get_actor_as_user(&like, context).await?;
let comment = CommentForm::from_apub(&note, context, None).await?;
let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context)
.await?
.id;
let like_form = CommentLikeForm {
comment_id,
post_id: comment.post_id,
user_id: user.id,
score: 1,
};
let user_id = user.id;
blocking(context.pool(), move |conn| {
CommentLike::remove(conn, user_id, comment_id)?;
CommentLike::like(conn, &like_form)
})
.await??;
// Refetch the view
let comment_view = blocking(context.pool(), move |conn| {
CommentView::read(conn, comment_id, None)
})
.await??;
// TODO get those recipient actor ids from somewhere
let recipient_ids = vec![];
let res = CommentResponse {
comment: comment_view,
recipient_ids,
form_id: None,
};
context.chat_server().do_send(SendComment {
op: UserOperation::CreateCommentLike,
comment: res,
websocket_id: None,
});
announce_if_community_is_local(like, &user, context).await?;
Ok(HttpResponse::Ok().finish())
}
pub(crate) async fn receive_dislike_comment(
dislike: Dislike,
context: &LemmyContext,
) -> Result<HttpResponse, LemmyError> {
let note = Note::from_any_base(
dislike
.object()
.to_owned()
.one()
.context(location_info!())?,
)?
.context(location_info!())?;
let user = get_actor_as_user(&dislike, context).await?;
let comment = CommentForm::from_apub(&note, context, None).await?;
let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context)
.await?
.id;
let like_form = CommentLikeForm {
comment_id,
post_id: comment.post_id,
user_id: user.id,
score: -1,
};
let user_id = user.id;
blocking(context.pool(), move |conn| {
CommentLike::remove(conn, user_id, comment_id)?;
CommentLike::like(conn, &like_form)
})
.await??;
// Refetch the view
let comment_view = blocking(context.pool(), move |conn| {
CommentView::read(conn, comment_id, None)
})
.await??;
// TODO get those recipient actor ids from somewhere
let recipient_ids = vec![];
let res = CommentResponse {
comment: comment_view,
recipient_ids,
form_id: None,
};
context.chat_server().do_send(SendComment {
op: UserOperation::CreateCommentLike,
comment: res,
websocket_id: None,
});
announce_if_community_is_local(dislike, &user, context).await?;
Ok(HttpResponse::Ok().finish())
}
pub(crate) async fn receive_delete_comment(
context: &LemmyContext,
delete: Delete,
comment: Comment,
) -> Result<HttpResponse, LemmyError> {
let deleted_comment = blocking(context.pool(), move |conn| {
Comment::update_deleted(conn, comment.id, true)
})
.await??;
// Refetch the view
let comment_id = deleted_comment.id;
let comment_view = blocking(context.pool(), move |conn| {
CommentView::read(conn, comment_id, None)
})
.await??;
// TODO get those recipient actor ids from somewhere
let recipient_ids = vec![];
let res = CommentResponse {
comment: comment_view,
recipient_ids,
form_id: None,
};
context.chat_server().do_send(SendComment {
op: UserOperation::EditComment,
comment: res,
websocket_id: None,
});
let user = get_actor_as_user(&delete, context).await?;
announce_if_community_is_local(delete, &user, context).await?;
Ok(HttpResponse::Ok().finish())
}
pub(crate) async fn receive_remove_comment(
context: &LemmyContext,
_remove: Remove,
comment: Comment,
) -> Result<HttpResponse, LemmyError> {
let removed_comment = blocking(context.pool(), move |conn| {
Comment::update_removed(conn, comment.id, true)
})
.await??;
// Refetch the view
let comment_id = removed_comment.id;
let comment_view = blocking(context.pool(), move |conn| {
CommentView::read(conn, comment_id, None)
})
.await??;
// TODO get those recipient actor ids from somewhere
let recipient_ids = vec![];
let res = CommentResponse {
comment: comment_view,
recipient_ids,
form_id: None,
};
context.chat_server().do_send(SendComment {
op: UserOperation::EditComment,
comment: res,
websocket_id: None,
});
Ok(HttpResponse::Ok().finish())
}

lemmy_apub/src/activities/receive/undo_comment.rs → lemmy_apub/src/activities/receive/comment_undo.rs View File


+ 130
- 0
lemmy_apub/src/activities/receive/community.rs View File

@ -0,0 +1,130 @@
use crate::activities::receive::{announce_if_community_is_local, get_actor_as_user};
use activitystreams::activity::{Delete, Remove, Undo};
use actix_web::HttpResponse;
use lemmy_db::{community::Community, community_view::CommunityView};
use lemmy_structs::{blocking, community::CommunityResponse};
use lemmy_utils::LemmyError;
use lemmy_websocket::{messages::SendCommunityRoomMessage, LemmyContext, UserOperation};
pub(crate) async fn receive_delete_community(
context: &LemmyContext,
delete: Delete,
community: Community,
) -> Result<HttpResponse, LemmyError> {
let deleted_community = blocking(context.pool(), move |conn| {
Community::update_deleted(conn, community.id, true)
})
.await??;
let community_id = deleted_community.id;
let res = CommunityResponse {
community: blocking(context.pool(), move |conn| {
CommunityView::read(conn, community_id, None)
})
.await??,
};
let community_id = res.community.id;
context.chat_server().do_send(SendCommunityRoomMessage {
op: UserOperation::EditCommunity,
response: res,
community_id,
websocket_id: None,
});
let user = get_actor_as_user(&delete, context).await?;
announce_if_community_is_local(delete, &user, context).await?;
Ok(HttpResponse::Ok().finish())
}
pub(crate) async fn receive_remove_community(
context: &LemmyContext,
_remove: Remove,
community: Community,
) -> Result<HttpResponse, LemmyError> {
let removed_community = blocking(context.pool(), move |conn| {
Community::update_removed(conn, community.id, true)
})
.await??;
let community_id = removed_community.id;
let res = CommunityResponse {
community: blocking(context.pool(), move |conn| {
CommunityView::read(conn, community_id, None)
})
.await??,
};
let community_id = res.community.id;
context.chat_server().do_send(SendCommunityRoomMessage {
op: UserOperation::EditCommunity,
response: res,
community_id,
websocket_id: None,
});
Ok(HttpResponse::Ok().finish())
}
pub(crate) async fn receive_undo_delete_community(
context: &LemmyContext,
undo: Undo,
community: Community,
) -> Result<HttpResponse, LemmyError> {
let deleted_community = blocking(context.pool(), move |conn| {
Community::update_deleted(conn, community.id, false)
})
.await??;
let community_id = deleted_community.id;
let res = CommunityResponse {
community: blocking(context.pool(), move |conn| {
CommunityView::read(conn, community_id, None)
})
.await??,
};
let community_id = res.community.id;
context.chat_server().do_send(SendCommunityRoomMessage {
op: UserOperation::EditCommunity,
response: res,
community_id,
websocket_id: None,
});
let user = get_actor_as_user(&undo, context).await?;
announce_if_community_is_local(undo, &user, context).await?;
Ok(HttpResponse::Ok().finish())
}
pub(crate) async fn receive_undo_remove_community(
context: &LemmyContext,
undo: Undo,
community: Community,
) -> Result<HttpResponse, LemmyError> {
let removed_community = blocking(context.pool(), move |conn| {
Community::update_removed(conn, community.id, false)
})
.await??;
let community_id = removed_community.id;
let res = CommunityResponse {
community: blocking(context.pool(), move |conn| {
CommunityView::read(conn, community_id, None)
})
.await??,
};
let community_id = res.community.id;
context.chat_server().do_send(SendCommunityRoomMessage {
op: UserOperation::EditCommunity,
response: res,
community_id,
websocket_id: None,
});
let mod_ = get_actor_as_user(&undo, context).await?;
announce_if_community_is_local(undo, &mod_, context).await?;
Ok(HttpResponse::Ok().finish())
}

+ 0
- 129
lemmy_apub/src/activities/receive/create.rs View File

@ -1,129 +0,0 @@
use crate::{
activities::receive::{
announce_if_community_is_local,
get_actor_as_user,
receive_unhandled_activity,
verify_activity_domains_valid,
},
ActorType,
FromApub,
PageExt,
};
use activitystreams::{activity::Create, base::AnyBase, object::Note, prelude::*};
use actix_web::HttpResponse;
use anyhow::Context;
use lemmy_db::{
comment::{Comment, CommentForm},
comment_view::CommentView,
post::{Post, PostForm},
post_view::PostView,
};
use lemmy_structs::{blocking, comment::CommentResponse, post::PostResponse, send_local_notifs};
use lemmy_utils::{location_info, utils::scrape_text_for_mentions, LemmyError};
use lemmy_websocket::{
messages::{SendComment, SendPost},
LemmyContext,
UserOperation,
};
use url::Url;
pub async fn receive_create(
context: &LemmyContext,
activity: AnyBase,
expected_domain: Url,
) -> Result<HttpResponse, LemmyError> {
let create = Create::from_any_base(activity)?.context(location_info!())?;
verify_activity_domains_valid(&create, expected_domain, true)?;
match create.object().as_single_kind_str() {
Some("Page") => receive_create_post(create, context).await,
Some("Note") => receive_create_comment(create, context).await,
_ => receive_unhandled_activity(create),
}
}
async fn receive_create_post(
create: Create,
context: &LemmyContext,
) -> Result<HttpResponse, LemmyError> {
let user = get_actor_as_user(&create, context).await?;
let page = PageExt::from_any_base(create.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?;
let post = PostForm::from_apub(&page, context, Some(user.actor_id()?)).await?;
// Using an upsert, since likes (which fetch the post), sometimes come in before the create
// resulting in double posts.
let inserted_post = blocking(context.pool(), move |conn| Post::upsert(conn, &post)).await??;
// Refetch the view
let inserted_post_id = inserted_post.id;
let post_view = blocking(context.pool(), move |conn| {
PostView::read(conn, inserted_post_id, None)
})
.await??;
let res = PostResponse { post: post_view };
context.chat_server().do_send(SendPost {
op: UserOperation::CreatePost,
post: res,
websocket_id: None,
});
announce_if_community_is_local(create, &user, context).await?;
Ok(HttpResponse::Ok().finish())
}
async fn receive_create_comment(
create: Create,
context: &LemmyContext,
) -> Result<HttpResponse, LemmyError> {
let user = get_actor_as_user(&create, context).await?;
let note = Note::from_any_base(create.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?;
let comment = CommentForm::from_apub(&note, context, Some(user.actor_id()?)).await?;
let inserted_comment =
blocking(context.pool(), move |conn| Comment::upsert(conn, &comment)).await??;
let post_id = inserted_comment.post_id;
let post = blocking(context.pool(), move |conn| Post::read(conn, post_id)).await??;
// Note:
// Although mentions could be gotten from the post tags (they are included there), or the ccs,
// Its much easier to scrape them from the comment body, since the API has to do that
// anyway.
let mentions = scrape_text_for_mentions(&inserted_comment.content);
let recipient_ids = send_local_notifs(
mentions,
inserted_comment.clone(),
&user,
post,
context.pool(),
true,
)
.await?;
// Refetch the view
let comment_view = blocking(context.pool(), move |conn| {
CommentView::read(conn, inserted_comment.id, None)
})
.await??;
let res = CommentResponse {
comment: comment_view,
recipient_ids,
form_id: None,
};
context.chat_server().do_send(SendComment {
op: UserOperation::CreateComment,
comment: res,
websocket_id: None,
});
announce_if_community_is_local(create, &user, context).await?;
Ok(HttpResponse::Ok().finish())
}

+ 0
- 149
lemmy_apub/src/activities/receive/delete.rs View File

@ -1,149 +0,0 @@
use crate::activities::receive::{
announce_if_community_is_local,
find_by_id,
get_actor_as_user,
verify_activity_domains_valid,
FindResults,
};
use activitystreams::{activity::Delete, base::AnyBase, prelude::*};
use actix_web::HttpResponse;
use anyhow::Context;
use lemmy_db::{
comment::Comment,
comment_view::CommentView,
community::Community,
community_view::CommunityView,
post::Post,
post_view::PostView,
};
use lemmy_structs::{
blocking,
comment::CommentResponse,
community::CommunityResponse,
post::PostResponse,
};
use lemmy_utils::{location_info, LemmyError};
use lemmy_websocket::{
messages::{SendComment, SendCommunityRoomMessage, SendPost},
LemmyContext,
UserOperation,
};
use url::Url;
pub async fn receive_delete(
context: &LemmyContext,
activity: AnyBase,
expected_domain: Url,
) -> Result<HttpResponse, LemmyError> {
let delete = Delete::from_any_base(activity)?.context(location_info!())?;
verify_activity_domains_valid(&delete, expected_domain, true)?;
let object = delete
.object()
.to_owned()
.single_xsd_any_uri()
.context(location_info!())?;
match find_by_id(context, object).await {
Ok(FindResults::Post(p)) => receive_delete_post(context, delete, p).await,
Ok(FindResults::Comment(c)) => receive_delete_comment(context, delete, c).await,
Ok(FindResults::Community(c)) => receive_delete_community(context, delete, c).await,
// if we dont have the object, no need to do anything
Err(_) => Ok(HttpResponse::Ok().finish()),
}
}
async fn receive_delete_post(
context: &LemmyContext,
delete: Delete,
post: Post,
) -> Result<HttpResponse, LemmyError> {
let deleted_post = blocking(context.pool(), move |conn| {
Post::update_deleted(conn, post.id, true)
})
.await??;
// Refetch the view
let post_id = deleted_post.id;
let post_view = blocking(context.pool(), move |conn| {
PostView::read(conn, post_id, None)
})
.await??;
let res = PostResponse { post: post_view };
context.chat_server().do_send(SendPost {
op: UserOperation::EditPost,
post: res,
websocket_id: None,
});
let user = get_actor_as_user(&delete, context).await?;
announce_if_community_is_local(delete, &user, context).await?;
Ok(HttpResponse::Ok().finish())
}
async fn receive_delete_comment(
context: &LemmyContext,
delete: Delete,
comment: Comment,
) -> Result<HttpResponse, LemmyError> {
let deleted_comment = blocking(context.pool(), move |conn| {
Comment::update_deleted(conn, comment.id, true)
})
.await??;
// Refetch the view
let comment_id = deleted_comment.id;
let comment_view = blocking(context.pool(), move |conn| {
CommentView::read(conn, comment_id, None)
})
.await??;
// TODO get those recipient actor ids from somewhere
let recipient_ids = vec![];
let res = CommentResponse {
comment: comment_view,
recipient_ids,
form_id: None,
};
context.chat_server().do_send(SendComment {
op: UserOperation::EditComment,
comment: res,
websocket_id: None,
});
let user = get_actor_as_user(&delete, context).await?;
announce_if_community_is_local(delete, &user, context).await?;
Ok(HttpResponse::Ok().finish())
}
async fn receive_delete_community(
context: &LemmyContext,
delete: Delete,
community: Community,
) -> Result<HttpResponse, LemmyError> {
let deleted_community = blocking(context.pool(), move |conn| {
Community::update_deleted(conn, community.id, true)
})
.await??;
let community_id = deleted_community.id;
let res = CommunityResponse {
community: blocking(context.pool(), move |conn| {
CommunityView::read(conn, community_id, None)
})
.await??,
};
let community_id = res.community.id;
context.chat_server().do_send(SendCommunityRoomMessage {
op: UserOperation::EditCommunity,
response: res,
community_id,
websocket_id: None,
});
let user = get_actor_as_user(&delete, context).await?;
announce_if_community_is_local(delete, &user, context).await?;
Ok(HttpResponse::Ok().finish())
}

+ 0
- 161
lemmy_apub/src/activities/receive/dislike.rs View File

@ -1,161 +0,0 @@
use crate::{
activities::receive::{
announce_if_community_is_local,
get_actor_as_user,
receive_unhandled_activity,
verify_activity_domains_valid,
},
fetcher::{get_or_fetch_and_insert_comment, get_or_fetch_and_insert_post},
FromApub,
PageExt,
};
use activitystreams::{activity::Dislike, base::AnyBase, object::Note, prelude::*};
use actix_web::HttpResponse;
use anyhow::Context;
use lemmy_db::{
comment::{CommentForm, CommentLike, CommentLikeForm},
comment_view::CommentView,
post::{PostForm, PostLike, PostLikeForm},
post_view::PostView,
site::Site,
Crud,
Likeable,
};
use lemmy_structs::{blocking, comment::CommentResponse, post::PostResponse};
use lemmy_utils::{location_info, LemmyError};
use lemmy_websocket::{
messages::{SendComment, SendPost},
LemmyContext,
UserOperation,
};
use url::Url;
pub async fn receive_dislike(
context: &LemmyContext,
activity: AnyBase,
expected_domain: Url,
) -> Result<HttpResponse, LemmyError> {
let enable_downvotes = blocking(context.pool(), move |conn| {
Site::read(conn, 1).map(|s| s.enable_downvotes)
})
.await??;
if !enable_downvotes {
return Ok(HttpResponse::Ok().finish());
}
let dislike = Dislike::from_any_base(activity)?.context(location_info!())?;
verify_activity_domains_valid(&dislike, expected_domain, false)?;
match dislike.object().as_single_kind_str() {
Some("Page") => receive_dislike_post(dislike, context).await,
Some("Note") => receive_dislike_comment(dislike, context).await,
_ => receive_unhandled_activity(dislike),
}
}
async fn receive_dislike_post(
dislike: Dislike,
context: &LemmyContext,
) -> Result<HttpResponse, LemmyError> {
let user = get_actor_as_user(&dislike, context).await?;
let page = PageExt::from_any_base(
dislike
.object()
.to_owned()
.one()
.context(location_info!())?,
)?
.context(location_info!())?;
let post = PostForm::from_apub(&page, context, None).await?;
let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context)
.await?
.id;
let like_form = PostLikeForm {
post_id,
user_id: user.id,
score: -1,
};
let user_id = user.id;
blocking(context.pool(), move |conn| {
PostLike::remove(conn, user_id, post_id)?;
PostLike::like(conn, &like_form)
})
.await??;
// Refetch the view
let post_view = blocking(context.pool(), move |conn| {
PostView::read(conn, post_id, None)
})
.await??;
let res = PostResponse { post: post_view };
context.chat_server().do_send(SendPost {
op: UserOperation::CreatePostLike,
post: res,
websocket_id: None,
});
announce_if_community_is_local(dislike, &user, context).await?;
Ok(HttpResponse::Ok().finish())
}
async fn receive_dislike_comment(
dislike: Dislike,
context: &LemmyContext,
) -> Result<HttpResponse, LemmyError> {
let note = Note::from_any_base(
dislike
.object()
.to_owned()
.one()
.context(location_info!())?,
)?
.context(location_info!())?;
let user = get_actor_as_user(&dislike, context).await?;
let comment = CommentForm::from_apub(&note, context, None).await?;
let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context)
.await?
.id;
let like_form = CommentLikeForm {
comment_id,
post_id: comment.post_id,
user_id: user.id,
score: -1,
};
let user_id = user.id;
blocking(context.pool(), move |conn| {
CommentLike::remove(conn, user_id, comment_id)?;
CommentLike::like(conn, &like_form)
})
.await??;
// Refetch the view
let comment_view = blocking(context.pool(), move |conn| {
CommentView::read(conn, comment_id, None)
})
.await??;
// TODO get those recipient actor ids from somewhere
let recipient_ids = vec![];
let res = CommentResponse {
comment: comment_view,
recipient_ids,
form_id: None,
};
context.chat_server().do_send(SendComment {
op: UserOperation::CreateCommentLike,
comment: res,
websocket_id: None,
});
announce_if_community_is_local(dislike, &user, context).await?;
Ok(HttpResponse::Ok().finish())
}

+ 0
- 136
lemmy_apub/src/activities/receive/like.rs View File

@ -1,136 +0,0 @@
use crate::{
activities::receive::{
announce_if_community_is_local,
get_actor_as_user,
receive_unhandled_activity,
verify_activity_domains_valid,
},
fetcher::{get_or_fetch_and_insert_comment, get_or_fetch_and_insert_post},
FromApub,
PageExt,
};
use activitystreams::{activity::Like, base::AnyBase, object::Note, prelude::*};
use actix_web::HttpResponse;
use anyhow::Context;
use lemmy_db::{
comment::{CommentForm, CommentLike, CommentLikeForm},
comment_view::CommentView,
post::{PostForm, PostLike, PostLikeForm},
post_view::PostView,
Likeable,
};
use lemmy_structs::{blocking, comment::CommentResponse, post::PostResponse};
use lemmy_utils::{location_info, LemmyError};
use lemmy_websocket::{
messages::{SendComment, SendPost},
LemmyContext,
UserOperation,
};
use url::Url;
pub async fn receive_like(
context: &LemmyContext,
activity: AnyBase,
expected_domain: Url,
) -> Result<HttpResponse, LemmyError> {
let like = Like::from_any_base(activity)?.context(location_info!())?;
verify_activity_domains_valid(&like, expected_domain, false)?;
match like.object().as_single_kind_str() {
Some("Page") => receive_like_post(like, context).await,
Some("Note") => receive_like_comment(like, context).await,
_ => receive_unhandled_activity(like),
}
}
async fn receive_like_post(like: Like, context: &LemmyContext) -> Result<HttpResponse, LemmyError> {
let user = get_actor_as_user(&like, context).await?;
let page = PageExt::from_any_base(like.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?;
let post = PostForm::from_apub(&page, context, None).await?;
let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context)
.await?
.id;
let like_form = PostLikeForm {
post_id,
user_id: user.id,
score: 1,
};
let user_id = user.id;
blocking(context.pool(), move |conn| {
PostLike::remove(conn, user_id, post_id)?;
PostLike::like(conn, &like_form)
})
.await??;
// Refetch the view
let post_view = blocking(context.pool(), move |conn| {
PostView::read(conn, post_id, None)
})
.await??;
let res = PostResponse { post: post_view };
context.chat_server().do_send(SendPost {
op: UserOperation::CreatePostLike,
post: res,
websocket_id: None,
});
announce_if_community_is_local(like, &user, context).await?;
Ok(HttpResponse::Ok().finish())
}
async fn receive_like_comment(
like: Like,
context: &LemmyContext,
) -> Result<HttpResponse, LemmyError> {
let note = Note::from_any_base(like.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?;
let user = get_actor_as_user(&like, context).await?;
let comment = CommentForm::from_apub(&note, context, None).await?;
let comment_id = get_or_fetch_and_insert_comment(&comment.get_ap_id()?, context)
.await?
.id;
let like_form = CommentLikeForm {
comment_id,
post_id: comment.post_id,
user_id: user.id,
score: 1,
};
let user_id = user.id;
blocking(context.pool(), move |conn| {
CommentLike::remove(conn, user_id, comment_id)?;
CommentLike::like(conn, &like_form)
})
.await??;
// Refetch the view
let comment_view = blocking(context.pool(), move |conn| {
CommentView::read(conn, comment_id, None)
})
.await??;
// TODO get those recipient actor ids from somewhere
let recipient_ids = vec![];
let res = CommentResponse {
comment: comment_view,
recipient_ids,
form_id: None,
};
context.chat_server().do_send(SendComment {
op: UserOperation::CreateCommentLike,
comment: res,
websocket_id: None,
});
announce_if_community_is_local(like, &user, context).await?;
Ok(HttpResponse::Ok().finish())
}

+ 6
- 11
lemmy_apub/src/activities/receive/mod.rs View File

@ -20,19 +20,14 @@ use serde::Serialize;
use std::fmt::Debug;
use url::Url;
pub mod announce;
pub mod create;
pub mod delete;
pub mod dislike;
pub mod like;
pub mod remove;
pub mod undo;
mod undo_comment;
mod undo_post;
pub mod update;
pub(crate) mod comment;
pub(crate) mod comment_undo;
pub(crate) mod community;
pub(crate) mod post;
pub(crate) mod post_undo;
/// Return HTTP 501 for unsupported activities in inbox.
fn receive_unhandled_activity<A>(activity: A) -> Result<HttpResponse, LemmyError>
pub(crate) fn receive_unhandled_activity<A>(activity: A) -> Result<HttpResponse, LemmyError>
where
A: Debug,
{


+ 242
- 0
lemmy_apub/src/activities/receive/post.rs View File

@ -0,0 +1,242 @@
use crate::{
activities::receive::{announce_if_community_is_local, get_actor_as_user},
fetcher::get_or_fetch_and_insert_post,
ActorType,
FromApub,
PageExt,
};
use activitystreams::{
activity::{Create, Delete, Dislike, Like, Remove, Update},
prelude::*,
};
use actix_web::HttpResponse;
use anyhow::Context;
use lemmy_db::{
post::{Post, PostForm, PostLike, PostLikeForm},
post_view::PostView,
Crud,
Likeable,
};
use lemmy_structs::{blocking, post::PostResponse};
use lemmy_utils::{location_info, LemmyError};
use lemmy_websocket::{messages::SendPost, LemmyContext, UserOperation};
pub(crate) async fn receive_create_post(
create: Create,
context: &LemmyContext,
) -> Result<HttpResponse, LemmyError> {
let user = get_actor_as_user(&create, context).await?;
let page = PageExt::from_any_base(create.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?;
let post = PostForm::from_apub(&page, context, Some(user.actor_id()?)).await?;
// Using an upsert, since likes (which fetch the post), sometimes come in before the create
// resulting in double posts.
let inserted_post = blocking(context.pool(), move |conn| Post::upsert(conn, &post)).await??;
// Refetch the view
let inserted_post_id = inserted_post.id;
let post_view = blocking(context.pool(), move |conn| {
PostView::read(conn, inserted_post_id, None)
})
.await??;
let res = PostResponse { post: post_view };
context.chat_server().do_send(SendPost {
op: UserOperation::CreatePost,
post: res,
websocket_id: None,
});
announce_if_community_is_local(create, &user, context).await?;
Ok(HttpResponse::Ok().finish())
}
pub(crate) async fn receive_update_post(
update: Update,
context: &LemmyContext,
) -> Result<HttpResponse, LemmyError> {
let user = get_actor_as_user(&update, context).await?;
let page = PageExt::from_any_base(update.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?;
let post = PostForm::from_apub(&page, context, Some(user.actor_id()?)).await?;
let original_post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context)
.await?
.id;
blocking(context.pool(), move |conn| {
Post::update(conn, original_post_id, &post)
})
.await??;
// Refetch the view
let post_view = blocking(context.pool(), move |conn| {
PostView::read(conn, original_post_id, None)
})
.await??;
let res = PostResponse { post: post_view };
context.chat_server().do_send(SendPost {
op: UserOperation::EditPost,
post: res,
websocket_id: None,
});
announce_if_community_is_local(update, &user, context).await?;
Ok(HttpResponse::Ok().finish())
}
pub(crate) async fn receive_like_post(
like: Like,
context: &LemmyContext,
) -> Result<HttpResponse, LemmyError> {
let user = get_actor_as_user(&like, context).await?;
let page = PageExt::from_any_base(like.object().to_owned().one().context(location_info!())?)?
.context(location_info!())?;
let post = PostForm::from_apub(&page, context, None).await?;
let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context)
.await?
.id;
let like_form = PostLikeForm {
post_id,
user_id: user.id,
score: 1,
};
let user_id = user.id;
blocking(context.pool(), move |conn| {
PostLike::remove(conn, user_id, post_id)?;
PostLike::like(conn, &like_form)
})
.await??;
// Refetch the view
let post_view = blocking(context.pool(), move |conn| {
PostView::read(conn, post_id, None)
})
.await??;
let res = PostResponse { post: post_view };
context.chat_server().do_send(SendPost {
op: UserOperation::CreatePostLike,
post: res,
websocket_id: None,
});
announce_if_community_is_local(like, &user, context).await?;
Ok(HttpResponse::Ok().finish())
}
pub(crate) async fn receive_dislike_post(
dislike: Dislike,
context: &LemmyContext,
) -> Result<HttpResponse, LemmyError> {
let user = get_actor_as_user(&dislike, context).await?;
let page = PageExt::from_any_base(
dislike
.object()
.to_owned()
.one()
.context(location_info!())?,
)?
.context(location_info!())?;
let post = PostForm::from_apub(&page, context, None).await?;
let post_id = get_or_fetch_and_insert_post(&post.get_ap_id()?, context)
.await?
.id;
let like_form = PostLikeForm {
post_id,
user_id: user.id,
score: -1,
};
let user_id = user.id;
blocking(context.pool(), move |conn| {
PostLike::remove(conn, user_id, post_id)?;
PostLike::like(conn, &like_form)
})
.await??;
// Refetch the view
let post_view = blocking(context.pool(), move |conn| {
PostView::read(conn, post_id, None)
})
.await??;
let res = PostResponse { post: post_view };
context.chat_server().do_send(SendPost {
op: UserOperation::CreatePostLike,
post: res,
websocket_id: None,
});
announce_if_community_is_local(dislike, &user, context).await?;
Ok(HttpResponse::Ok().finish())
}
pub(crate) async fn receive_delete_post(
context: &LemmyContext,
delete: Delete,
post: Post,
) -> Result<HttpResponse, LemmyError> {
let deleted_post = blocking(context.pool(), move |conn| {
Post::update_deleted(conn, post.id, true)
})
.await??;
// Refetch the view
let post_id = deleted_post.id;
let post_view = blocking(context.pool(), move |conn| {
PostView::read(conn, post_id, None)
})
.await??;
let res = PostResponse { post: post_view };
context.chat_server().do_send(SendPost {
op: UserOperation::EditPost,
post: res,
websocket_id: None,
});
let user = get_actor_as_user(&delete, context).await?;
announce_if_community_is_local(delete, &user, context).await?;
Ok(HttpResponse::Ok().finish())
}
pub(crate) async fn receive_remove_post(
context: &LemmyContext,
_remove: Remove,
post: Post,
) -> Result<HttpResponse, LemmyError> {
let removed_post = blocking(context.pool(), move |conn| {
Post::update_removed(conn, post.id, true)
})
.await??;
// Refetch the view
let post_id = removed_post.id;
let post_view = blocking(context.pool(), move |conn| {
PostView::read(conn, post_id, None)
})
.await??;
let res = PostResponse { post: post_view };
context.chat_server().do_send(SendPost {
op: UserOperation::EditPost,
post: res,
websocket_id: None,
});
Ok(HttpResponse::Ok().finish())
}

lemmy_apub/src/activities/receive/undo_post.rs → lemmy_apub/src/activities/receive/post_undo.rs View File


+ 0
- 151
lemmy_apub/src/activities/receive/remove.rs View File

@ -1,151 +0,0 @@
use crate::activities::receive::{find_by_id, verify_activity_domains_valid, FindResults};
use activitystreams::{activity::Remove, base::AnyBase, prelude::*};
use actix_web::HttpResponse;
use anyhow::Context;
use lemmy_db::{
comment::Comment,
comment_view::CommentView,
community::Community,
community_view::CommunityView,
post::Post,
post_view::PostView,
};
use lemmy_structs::{
blocking,
comment::CommentResponse,
community::CommunityResponse,
post::PostResponse,
};
use lemmy_utils::{location_info, LemmyError};
use lemmy_websocket::{
messages::{SendComment, SendCommunityRoomMessage, SendPost},
LemmyContext,
UserOperation,
};
use url::Url;
pub async fn receive_remove(
context: &LemmyContext,
activity: AnyBase,
expected_domain: Url,
) -> Result<HttpResponse, LemmyError> {
let remove = Remove::from_any_base(activity)?.context(location_info!())?;
verify_activity_domains_valid(&remove, expected_domain, false)?;
let cc = remove
.cc()
.map(|c| c.as_many())
.flatten()
.context(location_info!())?;
let community_id = cc
.first()
.map(|c| c.as_xsd_any_uri())
.flatten()
.context(location_info!())?;
let object = remove
.object()
.to_owned()
.single_xsd_any_uri()
.context(location_info!())?;
// Ensure that remove activity comes from the same domain as the community
remove.id(community_id.domain().context(location_info!())?)?;
match find_by_id(context, object).await {
Ok(FindResults::Post(p)) => receive_remove_post(context, remove, p).await,
Ok(FindResults::Comment(c)) => receive_remove_comment(context, remove, c).await,
Ok(FindResults::Community(c)) => receive_remove_community(context, remove, c).await,
// if we dont have the object, no need to do anything
Err(_) => Ok(HttpResponse::Ok().finish()),
}
}
async fn receive_remove_post(
context: &LemmyContext,
_remove: Remove,
post: Post,
) -> Result<HttpResponse, LemmyError> {
let removed_post = blocking(context.pool(), move |conn| {
Post::update_removed(conn, post.id, true)
})
.await??;
// Refetch the view
let post_id = removed_post.id;
let post_view = blocking(context.pool(), move |conn| {
PostView::read(conn, post_id, None)
})
.await??;
let res = PostResponse { post: post_view };
context.chat_server().do_send(SendPost {
op: UserOperation::EditPost,
post: res,
websocket_id: None,
});
Ok(HttpResponse::Ok().finish())
}
async fn receive_remove_comment(
context: &LemmyContext,
_remove: Remove,
comment: Comment,
) -> Result<HttpResponse, LemmyError> {
let removed_comment = blocking(context.pool(), move |conn| {
Comment::update_removed(conn, comment.id, true)
})
.await??;
// Refetch the view
let comment_id = removed_comment.id;
let comment_view = blocking(context.pool(), move |conn| {
CommentView::read(conn, comment_id, None)
})
.await??;
// TODO get those recipient actor ids from somewhere
let recipient_ids = vec![];
let res = CommentResponse {
comment: comment_view,
recipient_ids,
form_id: None,
};
context.chat_server().do_send(SendComment {
op: UserOperation::EditComment,
comment: res,
websocket_id: None,
});
Ok(HttpResponse::Ok().finish())
}
async fn receive_remove_community(
context: &LemmyContext,
_remove: Remove,
community: Community,
) -> Result<HttpResponse, LemmyError> {
let removed_community = blocking(context.pool(), move |conn| {
Community::update_removed(conn, community.id, true)
})
.await??;
let community_id = removed_community.id;
let res = CommunityResponse {
community: blocking(context.pool(), move |conn| {