Actor suspension refactoring

Signed-off-by: Thomas Citharel <tcit@tcit.fr>
This commit is contained in:
Thomas Citharel
2021-09-10 11:36:05 +02:00
parent e9fecc4d24
commit 75e254d8b4
23 changed files with 816 additions and 603 deletions

View File

@@ -8,13 +8,12 @@ defmodule Mobilizon.Federation.ActivityPub.Refresher do
alias Mobilizon.Federation.ActivityPub
alias Mobilizon.Federation.ActivityPub.Actor, as: ActivityPubActor
alias Mobilizon.Federation.ActivityPub.{Fetcher, Relay, Transmogrifier, Utils}
alias Mobilizon.Service.ErrorReporting.Sentry
require Logger
@doc """
Refresh a remote profile
"""
@spec refresh_profile(Actor.t()) :: {:ok, Actor.t()}
@spec refresh_profile(Actor.t()) :: {:ok, Actor.t()} | {:error, fetch_actor_errors()} | {:error}
def refresh_profile(%Actor{domain: nil}), do: {:error, "Can only refresh remote actors"}
def refresh_profile(%Actor{type: :Group, url: url, id: group_id} = group) do
@@ -33,74 +32,84 @@ defmodule Mobilizon.Federation.ActivityPub.Refresher do
end
def refresh_profile(%Actor{type: type, url: url}) when type in [:Person, :Application] do
with {:ok, %Actor{outbox_url: outbox_url} = actor} <-
ActivityPubActor.make_actor_from_url(url),
:ok <- fetch_collection(outbox_url, Relay.get_actor()) do
{:ok, actor}
case ActivityPubActor.make_actor_from_url(url) do
{:ok, %Actor{outbox_url: outbox_url} = actor} ->
case fetch_collection(outbox_url, Relay.get_actor()) do
:ok -> {:ok, actor}
{:error, error} -> {:error, error}
end
{:error, error} ->
{:error, error}
end
end
@spec fetch_group(String.t(), Actor.t()) :: :ok
@type fetch_actor_errors :: ActivityPubActor.make_actor_errors() | fetch_collection_errors()
@spec fetch_group(String.t(), Actor.t()) :: :ok | {:error, fetch_actor_errors}
def fetch_group(group_url, %Actor{} = on_behalf_of) do
with {:ok,
%Actor{
outbox_url: outbox_url,
resources_url: resources_url,
members_url: members_url,
posts_url: posts_url,
todos_url: todos_url,
discussions_url: discussions_url,
events_url: events_url
}} <-
ActivityPubActor.make_actor_from_url(group_url),
:ok <- fetch_collection(outbox_url, on_behalf_of),
:ok <- fetch_collection(members_url, on_behalf_of),
:ok <- fetch_collection(resources_url, on_behalf_of),
:ok <- fetch_collection(posts_url, on_behalf_of),
:ok <- fetch_collection(todos_url, on_behalf_of),
:ok <- fetch_collection(discussions_url, on_behalf_of),
:ok <- fetch_collection(events_url, on_behalf_of) do
:ok
else
{:error, :actor_deleted} ->
{:error, :actor_deleted}
case ActivityPubActor.make_actor_from_url(group_url) do
{:ok,
%Actor{
outbox_url: outbox_url,
resources_url: resources_url,
members_url: members_url,
posts_url: posts_url,
todos_url: todos_url,
discussions_url: discussions_url,
events_url: events_url
}} ->
Logger.debug("Fetched group OK, now doing collections")
{:error, :http_error} ->
{:error, :http_error}
with :ok <- fetch_collection(outbox_url, on_behalf_of),
:ok <- fetch_collection(members_url, on_behalf_of),
:ok <- fetch_collection(resources_url, on_behalf_of),
:ok <- fetch_collection(posts_url, on_behalf_of),
:ok <- fetch_collection(todos_url, on_behalf_of),
:ok <- fetch_collection(discussions_url, on_behalf_of),
:ok <- fetch_collection(events_url, on_behalf_of) do
:ok
else
{:error, err}
when err in [:error, :process_error, :fetch_error, :collection_url_nil] ->
Logger.debug("Error while fetching actor collection")
{:error, err}
end
{:error, err} ->
Logger.error("Error while refreshing a group")
Sentry.capture_message("Error while refreshing a group",
extra: %{group_url: group_url}
)
Logger.debug(inspect(err))
{:error, err}
when err in [:actor_deleted, :http_error, :json_decode_error, :actor_is_local] ->
Logger.debug("Error while making actor")
{:error, err}
err ->
Logger.error("Error while refreshing a group")
Sentry.capture_message("Error while refreshing a group",
extra: %{group_url: group_url}
)
Logger.debug(inspect(err))
err
end
end
def fetch_collection(nil, _on_behalf_of), do: :error
@typep fetch_collection_errors :: :process_error | :fetch_error | :collection_url_nil
@spec fetch_collection(String.t() | nil, any) ::
:ok | {:error, fetch_collection_errors}
def fetch_collection(nil, _on_behalf_of), do: {:error, :collection_url_nil}
def fetch_collection(collection_url, on_behalf_of) do
Logger.debug("Fetching and preparing collection from url")
Logger.debug(inspect(collection_url))
with {:ok, data} <- Fetcher.fetch(collection_url, on_behalf_of: on_behalf_of),
:ok <- Logger.debug("Fetch ok, passing to process_collection"),
:ok <- process_collection(data, on_behalf_of) do
Logger.debug("Finished processing a collection")
:ok
case Fetcher.fetch(collection_url, on_behalf_of: on_behalf_of) do
{:ok, data} when is_map(data) ->
Logger.debug("Fetch ok, passing to process_collection")
case process_collection(data, on_behalf_of) do
:ok ->
Logger.debug("Finished processing a collection")
:ok
:error ->
Logger.debug("Failed to process collection #{collection_url}")
{:error, :process_error}
end
{:error, _err} ->
Logger.debug("Failed to fetch collection #{collection_url}")
{:error, :fetch_error}
end
end
@@ -127,6 +136,7 @@ defmodule Mobilizon.Federation.ActivityPub.Refresher do
|> Enum.each(&refresh_profile/1)
end
@spec process_collection(map(), any()) :: :ok | :error
defp process_collection(%{"type" => type, "orderedItems" => items}, _on_behalf_of)
when type in ["OrderedCollection", "OrderedCollectionPage"] do
Logger.debug(
@@ -168,6 +178,8 @@ defmodule Mobilizon.Federation.ActivityPub.Refresher do
defp process_collection(_, _), do: :error
# If we're handling an activity
@spec handling_element(map()) :: {:ok, any, struct} | :error
@spec handling_element(String.t()) :: {:ok, struct} | {:error, any()}
defp handling_element(%{"type" => activity_type} = data)
when activity_type in ["Create", "Update", "Delete"] do
object = get_in(data, ["object"])