#89 implement ActivitySender actor

Merged
dessalines merged 28 commits from activity-sender into main 5 months ago
nutomic commented 6 months ago
Owner

TODO:

TODO: - [x] fix federation tests - [x] proper error handling without unwrap - [x] retry if the destination server is unreachable - [ ] persistent storage of outgoing activities, so that lemmy can retry sending after a restart (probably leave this for later)
dessalines reviewed 6 months ago
server/src/apub/activity_sender.rs
// We cant use ActorType here, because it doesnt implement Sized
#[derive(Message)]
#[rtype(result = "Result<(), LemmyError>")]
dessalines commented 6 months ago

This can just be #[rtype(result = "()")], because we're not gonna wait for the result.

This can just be `#[rtype(result = "()")]`, because we're not gonna wait for the result.
nutomic marked this conversation as resolved
dessalines reviewed 6 months ago
server/src/apub/activity_sender.rs
}
#[derive(Message)]
#[rtype(result = "Result<(), LemmyError>")]
dessalines commented 6 months ago

Same.

Same.
nutomic marked this conversation as resolved
dessalines reviewed 6 months ago
server/src/apub/activity_sender.rs
}
impl Handler<SendCommunityActivity> for ActivitySender {
type Result = Result<(), LemmyError>;
dessalines commented 6 months ago

type Result = ();

type Result = ();
nutomic marked this conversation as resolved
dessalines reviewed 6 months ago
server/src/apub/activities.rs Outdated
actor: creator.to_owned(),
to,
};
context.activity_sender().send(message).await??;
dessalines commented 6 months ago

send is only when you want the result, do_send is when you don't want to wait, which is what we want for these.

https://actix.rs/book/actix/sec-3-address.html#message

`send` is only when you want the result, `do_send` is when you don't want to wait, which is what we want for these. https://actix.rs/book/actix/sec-3-address.html#message
nutomic commented 6 months ago

Okay done, although I dont like the fact that this method will fail silently if there is any error. Hopefully that wont happen in our setup.

Okay done, although I dont like the fact that this method will fail silently if there is any error. Hopefully that wont happen in our setup.
dessalines reviewed 6 months ago
server/src/apub/activity_sender.rs Outdated
.header("Content-Type", "application/json");
let serialised_activity = serialised_activity.clone();
Box::pin(async move {
dessalines commented 6 months ago

Hrm, not sure why this is required.

Hrm, not sure why this is required.
nutomic commented 6 months ago

Because I'm calling some async functions in there and need to await them.

Because I'm calling some async functions in there and need to await them.
nutomic reviewed 6 months ago
server/src/apub/activity_queue.rs Outdated
impl Job for SendActivityTask {
type State = ();
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
nutomic commented 6 months ago

This is failing to compile, even though it seems identical to the code in ap-relay.

Here's the error message:

This is failing to compile, even though it seems identical to the code in [ap-relay](https://git.asonix.dog/asonix/ap-relay/src/branch/main/src/jobs/deliver.rs). Here's the error message: ![](https://dev.lemmy.ml/pictrs/image/v4kDVr2U99.png)
nutomic commented 6 months ago
Poster
Owner

I implemented upsert() functions for user and community. This mostly fixed the behaviour in manual tests as far as I can see. But automated tests are still failing just as before.

One problem is that the same activity is received multiple times. So far I'm not sure why it sends the same activity more than once, as there seems to be no error that would cause a retry. Setting MAX_RETRIES=0 doesnt change this.

Edit: test failures seem somewhat random, for example create post passes one time, and fails when I run the test again a second time.

Also, websocket updates are still not working reliably even after merging main.

Edit 2: Okay one problem I see is that sometimes a comment like is received before the comment create. In that case the comment is fetched and inserted, but once the create comment arrives, it is also inserted. This could be due a race condition in Comment::upsert.

I implemented upsert() functions for user and community. This mostly fixed the behaviour in manual tests as far as I can see. But automated tests are still failing just as before. One problem is that the same activity is received multiple times. So far I'm not sure why it sends the same activity more than once, as there seems to be no error that would cause a retry. Setting MAX_RETRIES=0 doesnt change this. Edit: test failures seem somewhat random, for example create post passes one time, and fails when I run the test again a second time. Also, websocket updates are still not working reliably even after merging main. Edit 2: Okay one problem I see is that sometimes a comment like is received before the comment create. In that case the comment is fetched and inserted, but once the create comment arrives, it is also inserted. This could be due a race condition in Comment::upsert.
nutomic commented 6 months ago
Poster
Owner

Yep its definitely a race condition between Comment::upsert() and fetcher::get_or_fetch_and_insert_comment(). It happens roughly 50% of the time. Here is an excerpt from the log:

lemmy-beta_1        | Comment::upsert() (entered into function)
lemmy-beta_1        | Comment::upsert() (checked if comment exists)
lemmy-beta_1        | creating comment http://lemmy-alpha:8540/comment/12
lemmy-beta_1        | [2020-08-24T13:45:04Z DEBUG lemmy_server::apub::fetcher] Fetching and creating remote comment and its parents: http://lemmy-alpha:8540/comment/12
lemmy-alpha_1       | [2020-08-24T13:45:04Z INFO  actix_web::middleware::logger] 172.20.0.13:33194 "GET /comment/12 HTTP/1.1" 200 311 "-" "-" 0.004361
lemmy-beta_1        | creating comment http://lemmy-alpha:8540/comment/12
lemmy-beta_1        | Comment::upsert() (ran the match statement)

So upsert() needs to create an exclusive lock on the comment table, so that nothing else can write while the function is running. The same goes for upsert functions in User, Community and Post.

fetcher::get_or_fetch_and_insert_comment() (and similar functions) will need to be changed, as a comment could be inserted between the calls to Comment::read_from_apub_id() (line 384) and Comment::create() (line 400). Its probably enough to replace Comment::create() with Comment::upsert().

Note: we could avoid this problem to some degree by ensuring that the Like is always sent out after the Create. But in the real world, there is no guarantee that messages arrive in the correct order, so we have to handle this case anyway.

Yep its definitely a race condition between `Comment::upsert()` and `fetcher::get_or_fetch_and_insert_comment()`. It happens roughly 50% of the time. Here is an excerpt from the log: ``` lemmy-beta_1 | Comment::upsert() (entered into function) lemmy-beta_1 | Comment::upsert() (checked if comment exists) lemmy-beta_1 | creating comment http://lemmy-alpha:8540/comment/12 lemmy-beta_1 | [2020-08-24T13:45:04Z DEBUG lemmy_server::apub::fetcher] Fetching and creating remote comment and its parents: http://lemmy-alpha:8540/comment/12 lemmy-alpha_1 | [2020-08-24T13:45:04Z INFO actix_web::middleware::logger] 172.20.0.13:33194 "GET /comment/12 HTTP/1.1" 200 311 "-" "-" 0.004361 lemmy-beta_1 | creating comment http://lemmy-alpha:8540/comment/12 lemmy-beta_1 | Comment::upsert() (ran the match statement) ``` So `upsert()` needs to create an exclusive lock on the comment table, so that nothing else can write while the function is running. The same goes for upsert functions in User, Community and Post. `fetcher::get_or_fetch_and_insert_comment()` (and similar functions) will need to be changed, as a comment could be inserted between the calls to `Comment::read_from_apub_id()` (line 384) and `Comment::create()` (line 400). Its probably enough to replace `Comment::create()` with `Comment::upsert()`. Note: we could avoid this problem to some degree by ensuring that the Like is always sent out after the Create. But in the real world, there is no guarantee that messages arrive in the correct order, so we have to handle this case anyway.
nutomic changed title from WIP: implement ActivitySender actor to implement ActivitySender actor 6 months ago
nutomic reviewed 5 months ago
server/lemmy_db/src/comment.rs Outdated
}
fn create(conn: &PgConnection, comment_form: &CommentForm) -> Result<Self, Error> {
println!("creating {}", &comment_form.ap_id);
nutomic commented 5 months ago

I think you already cleaned up the println!() below in upsert in your PR, but this one should also be removed.

I think you already cleaned up the println!() below in upsert in your PR, but this one should also be removed.
dessalines merged commit d4dccd17ae into main 5 months ago
The pull request has been merged as d4dccd17ae.
Sign in to join this conversation.
No reviewers
No Label
No Milestone
No Assignees
2 Participants
Notifications
Due Date

No due date set.

Dependencies

This pull request currently doesn't have any dependencies.

Loading…
There is no content yet.