implement ActivitySender actor #89

Merged
dessalines merged 28 commits from activity-sender into main 1 year ago
nutomic commented 1 year ago
Owner

TODO:

  • fix federation tests
  • proper error handling without unwrap
  • retry if the destination server is unreachable
  • persistent storage of outgoing activities, so that lemmy can retry sending after a restart (probably leave this for later)
TODO: - [x] fix federation tests - [x] proper error handling without unwrap - [x] retry if the destination server is unreachable - [ ] persistent storage of outgoing activities, so that lemmy can retry sending after a restart (probably leave this for later)
dessalines reviewed 1 year ago
// We cant use ActorType here, because it doesnt implement Sized
#[derive(Message)]
#[rtype(result = "Result<(), LemmyError>")]
Poster
Owner

This can just be #[rtype(result = "()")], because we're not gonna wait for the result.

This can just be `#[rtype(result = "()")]`, because we're not gonna wait for the result.
nutomic marked this conversation as resolved
dessalines reviewed 1 year ago
}
#[derive(Message)]
#[rtype(result = "Result<(), LemmyError>")]
Poster
Owner

Same.

Same.
nutomic marked this conversation as resolved
dessalines reviewed 1 year ago
}
impl Handler<SendCommunityActivity> for ActivitySender {
type Result = Result<(), LemmyError>;
Poster
Owner

type Result = ();

type Result = ();
nutomic marked this conversation as resolved
dessalines reviewed 1 year ago
actor: creator.to_owned(),
to,
};
context.activity_sender().send(message).await??;
Poster
Owner

send is only when you want the result, do_send is when you don't want to wait, which is what we want for these.

https://actix.rs/book/actix/sec-3-address.html#message

`send` is only when you want the result, `do_send` is when you don't want to wait, which is what we want for these. https://actix.rs/book/actix/sec-3-address.html#message
nutomic commented 1 year ago
Poster
Owner

Okay done, although I dont like the fact that this method will fail silently if there is any error. Hopefully that wont happen in our setup.

Okay done, although I dont like the fact that this method will fail silently if there is any error. Hopefully that wont happen in our setup.
dessalines reviewed 1 year ago
.header("Content-Type", "application/json");
let serialised_activity = serialised_activity.clone();
Box::pin(async move {
Poster
Owner

Hrm, not sure why this is required.

Hrm, not sure why this is required.
nutomic commented 1 year ago
Poster
Owner

Because I'm calling some async functions in there and need to await them.

Because I'm calling some async functions in there and need to await them.
nutomic reviewed 1 year ago
impl Job for SendActivityTask {
type State = ();
type Future = Pin<Box<dyn Future<Output = Result<(), Error>>>>;
nutomic commented 1 year ago
Poster
Owner

This is failing to compile, even though it seems identical to the code in ap-relay.

Here's the error message:

This is failing to compile, even though it seems identical to the code in [ap-relay](https://git.asonix.dog/asonix/ap-relay/src/branch/main/src/jobs/deliver.rs). Here's the error message: ![](https://dev.lemmy.ml/pictrs/image/v4kDVr2U99.png)
Poster
Owner

I implemented upsert() functions for user and community. This mostly fixed the behaviour in manual tests as far as I can see. But automated tests are still failing just as before.

One problem is that the same activity is received multiple times. So far I'm not sure why it sends the same activity more than once, as there seems to be no error that would cause a retry. Setting MAX_RETRIES=0 doesnt change this.

Edit: test failures seem somewhat random, for example create post passes one time, and fails when I run the test again a second time.

Also, websocket updates are still not working reliably even after merging main.

Edit 2: Okay one problem I see is that sometimes a comment like is received before the comment create. In that case the comment is fetched and inserted, but once the create comment arrives, it is also inserted. This could be due a race condition in Comment::upsert.

I implemented upsert() functions for user and community. This mostly fixed the behaviour in manual tests as far as I can see. But automated tests are still failing just as before. One problem is that the same activity is received multiple times. So far I'm not sure why it sends the same activity more than once, as there seems to be no error that would cause a retry. Setting MAX_RETRIES=0 doesnt change this. Edit: test failures seem somewhat random, for example create post passes one time, and fails when I run the test again a second time. Also, websocket updates are still not working reliably even after merging main. Edit 2: Okay one problem I see is that sometimes a comment like is received before the comment create. In that case the comment is fetched and inserted, but once the create comment arrives, it is also inserted. This could be due a race condition in Comment::upsert.
Poster
Owner

Yep its definitely a race condition between Comment::upsert() and fetcher::get_or_fetch_and_insert_comment(). It happens roughly 50% of the time. Here is an excerpt from the log:

lemmy-beta_1        | Comment::upsert() (entered into function)
lemmy-beta_1        | Comment::upsert() (checked if comment exists)
lemmy-beta_1        | creating comment http://lemmy-alpha:8540/comment/12
lemmy-beta_1        | [2020-08-24T13:45:04Z DEBUG lemmy_server::apub::fetcher] Fetching and creating remote comment and its parents: http://lemmy-alpha:8540/comment/12
lemmy-alpha_1       | [2020-08-24T13:45:04Z INFO  actix_web::middleware::logger] 172.20.0.13:33194 "GET /comment/12 HTTP/1.1" 200 311 "-" "-" 0.004361
lemmy-beta_1        | creating comment http://lemmy-alpha:8540/comment/12
lemmy-beta_1        | Comment::upsert() (ran the match statement)

So upsert() needs to create an exclusive lock on the comment table, so that nothing else can write while the function is running. The same goes for upsert functions in User, Community and Post.

fetcher::get_or_fetch_and_insert_comment() (and similar functions) will need to be changed, as a comment could be inserted between the calls to Comment::read_from_apub_id() (line 384) and Comment::create() (line 400). Its probably enough to replace Comment::create() with Comment::upsert().

Note: we could avoid this problem to some degree by ensuring that the Like is always sent out after the Create. But in the real world, there is no guarantee that messages arrive in the correct order, so we have to handle this case anyway.

Yep its definitely a race condition between `Comment::upsert()` and `fetcher::get_or_fetch_and_insert_comment()`. It happens roughly 50% of the time. Here is an excerpt from the log: ``` lemmy-beta_1 | Comment::upsert() (entered into function) lemmy-beta_1 | Comment::upsert() (checked if comment exists) lemmy-beta_1 | creating comment http://lemmy-alpha:8540/comment/12 lemmy-beta_1 | [2020-08-24T13:45:04Z DEBUG lemmy_server::apub::fetcher] Fetching and creating remote comment and its parents: http://lemmy-alpha:8540/comment/12 lemmy-alpha_1 | [2020-08-24T13:45:04Z INFO actix_web::middleware::logger] 172.20.0.13:33194 "GET /comment/12 HTTP/1.1" 200 311 "-" "-" 0.004361 lemmy-beta_1 | creating comment http://lemmy-alpha:8540/comment/12 lemmy-beta_1 | Comment::upsert() (ran the match statement) ``` So `upsert()` needs to create an exclusive lock on the comment table, so that nothing else can write while the function is running. The same goes for upsert functions in User, Community and Post. `fetcher::get_or_fetch_and_insert_comment()` (and similar functions) will need to be changed, as a comment could be inserted between the calls to `Comment::read_from_apub_id()` (line 384) and `Comment::create()` (line 400). Its probably enough to replace `Comment::create()` with `Comment::upsert()`. Note: we could avoid this problem to some degree by ensuring that the Like is always sent out after the Create. But in the real world, there is no guarantee that messages arrive in the correct order, so we have to handle this case anyway.
nutomic changed title from WIP: implement ActivitySender actor to implement ActivitySender actor 1 year ago
nutomic reviewed 1 year ago
}
fn create(conn: &PgConnection, comment_form: &CommentForm) -> Result<Self, Error> {
println!("creating {}", &comment_form.ap_id);
nutomic commented 1 year ago
Poster
Owner

I think you already cleaned up the println!() below in upsert in your PR, but this one should also be removed.

I think you already cleaned up the println!() below in upsert in your PR, but this one should also be removed.
dessalines merged commit d4dccd17ae into main 1 year ago
The pull request has been merged as d4dccd17ae.
Sign in to join this conversation.
No reviewers
No Label
No Milestone
No Assignees
2 Participants
Notifications
Due Date

No due date set.

Dependencies

This pull request currently doesn't have any dependencies.

Loading…
There is no content yet.