Refactor Mobilizon.Federation.ActivityPub and add typespecs
Signed-off-by: Thomas Citharel <tcit@tcit.fr>
This commit is contained in:
@@ -19,10 +19,12 @@ defmodule Mobilizon.Federation.ActivityPub.Federator do
|
||||
|
||||
@max_jobs 20
|
||||
|
||||
@spec init(any()) :: {:ok, any()}
|
||||
def init(args) do
|
||||
{:ok, args}
|
||||
end
|
||||
|
||||
@spec start_link(any) :: GenServer.on_start()
|
||||
def start_link(_) do
|
||||
spawn(fn ->
|
||||
# 1 minute
|
||||
@@ -39,6 +41,8 @@ defmodule Mobilizon.Federation.ActivityPub.Federator do
|
||||
)
|
||||
end
|
||||
|
||||
@spec handle(:publish | :publish_single_ap | atom(), Activity.t() | map()) ::
|
||||
:ok | {:ok, Activity.t()} | Tesla.Env.result() | {:error, String.t()}
|
||||
def handle(:publish, activity) do
|
||||
Logger.debug(inspect(activity))
|
||||
Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
|
||||
@@ -46,7 +50,7 @@ defmodule Mobilizon.Federation.ActivityPub.Federator do
|
||||
with {:ok, %Actor{} = actor} <-
|
||||
ActivityPubActor.get_or_fetch_actor_by_url(activity.data["actor"]) do
|
||||
Logger.info(fn -> "Sending #{activity.data["id"]} out via AP" end)
|
||||
ActivityPub.publish(actor, activity)
|
||||
ActivityPub.Publisher.publish(actor, activity)
|
||||
end
|
||||
end
|
||||
|
||||
@@ -67,7 +71,7 @@ defmodule Mobilizon.Federation.ActivityPub.Federator do
|
||||
end
|
||||
|
||||
def handle(:publish_single_ap, params) do
|
||||
ActivityPub.publish_one(params)
|
||||
ActivityPub.Publisher.publish_one(params)
|
||||
end
|
||||
|
||||
def handle(type, _) do
|
||||
@@ -75,6 +79,7 @@ defmodule Mobilizon.Federation.ActivityPub.Federator do
|
||||
{:error, "Don't know what to do with this"}
|
||||
end
|
||||
|
||||
@spec enqueue(atom(), map(), pos_integer()) :: :ok | {:ok, any()} | {:error, any()}
|
||||
def enqueue(type, payload, priority \\ 1) do
|
||||
Logger.debug("enqueue something with type #{inspect(type)}")
|
||||
|
||||
@@ -85,6 +90,7 @@ defmodule Mobilizon.Federation.ActivityPub.Federator do
|
||||
end
|
||||
end
|
||||
|
||||
@spec maybe_start_job(any(), any()) :: {any(), any()}
|
||||
def maybe_start_job(running_jobs, queue) do
|
||||
if :sets.size(running_jobs) < @max_jobs && queue != [] do
|
||||
{{type, payload}, queue} = queue_pop(queue)
|
||||
@@ -96,6 +102,7 @@ defmodule Mobilizon.Federation.ActivityPub.Federator do
|
||||
end
|
||||
end
|
||||
|
||||
@spec handle_cast(any(), any()) :: {:noreply, any()}
|
||||
def handle_cast({:enqueue, type, payload, _priority}, state)
|
||||
when type in [:incoming_doc, :incoming_ap_doc] do
|
||||
%{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
|
||||
@@ -119,6 +126,7 @@ defmodule Mobilizon.Federation.ActivityPub.Federator do
|
||||
{:noreply, state}
|
||||
end
|
||||
|
||||
@spec handle_info({:DOWN, any(), :process, any, any()}, any) :: {:noreply, map()}
|
||||
def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
|
||||
%{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
|
||||
i_running_jobs = :sets.del_element(ref, i_running_jobs)
|
||||
@@ -129,11 +137,13 @@ defmodule Mobilizon.Federation.ActivityPub.Federator do
|
||||
{:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
|
||||
end
|
||||
|
||||
@spec enqueue_sorted(any(), any(), pos_integer()) :: any()
|
||||
def enqueue_sorted(queue, element, priority) do
|
||||
[%{item: element, priority: priority} | queue]
|
||||
|> Enum.sort_by(fn %{priority: priority} -> priority end)
|
||||
end
|
||||
|
||||
@spec queue_pop(list(any())) :: {any(), list(any())}
|
||||
def queue_pop([%{item: element} | queue]) do
|
||||
{element, queue}
|
||||
end
|
||||
|
||||
Reference in New Issue
Block a user