Fix issue when updating event and introduce background jobs

Signed-off-by: Thomas Citharel <tcit@tcit.fr>
This commit is contained in:
Thomas Citharel
2019-11-04 15:10:58 +01:00
parent fb25c7c07f
commit 95ba76a0fa
17 changed files with 193 additions and 39 deletions

View File

@@ -7,7 +7,7 @@ defmodule Mix.Tasks.Mobilizon.SetupSearch do
use Mix.Task
alias Mobilizon.Service.Search
alias Mobilizon.Service.Workers.BuildSearchWorker
alias Mobilizon.Storage.Repo
alias Mobilizon.Events.Event
import Ecto.Query
@@ -30,7 +30,7 @@ defmodule Mix.Tasks.Mobilizon.SetupSearch do
end
defp insert_search_event([%Event{url: url} = event | events], nb_events) do
case Search.insert_search_event(event) do
case BuildSearchWorker.insert_search_event(event) do
{:ok, _} ->
Logger.debug("Added event #{url} to the search")

View File

@@ -37,6 +37,7 @@ defmodule Mobilizon do
# supervisors
Mobilizon.Storage.Repo,
MobilizonWeb.Endpoint,
{Oban, Application.get_env(:mobilizon, Oban)},
# workers
Guardian.DB.Token.SweeperServer,
Mobilizon.Service.Federator,
@@ -46,9 +47,7 @@ defmodule Mobilizon do
cachex_spec(:activity_pub, 2500, 3, 15)
]
opts = [strategy: :one_for_one, name: Mobilizon.Supervisor]
Supervisor.start_link(children, opts)
Supervisor.start_link(children, strategy: :one_for_one, name: Mobilizon.Supervisor)
end
@spec config_change(keyword, keyword, [atom]) :: :ok

View File

@@ -179,6 +179,8 @@ defmodule Mobilizon.Events.Event do
defp put_tags(%Changeset{} = changeset, _), do: changeset
# We need a changeset instead of a raw struct because of slug which is generated in changeset
defp process_tag(%{id: _id} = tag), do: tag
defp process_tag(tag) do
Tag.changeset(%Tag{}, tag)
end

View File

@@ -13,7 +13,7 @@ defmodule Mobilizon.Events do
alias Mobilizon.Actors.Actor
alias Mobilizon.Addresses.Address
alias Mobilizon.Service.Search
alias Mobilizon.Service.Workers.BuildSearchWorker
alias Mobilizon.Events.{
Comment,
@@ -253,7 +253,9 @@ defmodule Mobilizon.Events do
def create_event(attrs \\ %{}) do
with {:ok, %{insert: %Event{} = event}} <- do_create_event(attrs),
%Event{} = event <- Repo.preload(event, @event_preloads) do
Task.start(fn -> Search.insert_search_event(event) end)
unless event.draft,
do: BuildSearchWorker.enqueue(:insert_search_event, %{"event_id" => event.id})
{:ok, event}
else
err -> err
@@ -291,7 +293,7 @@ defmodule Mobilizon.Events do
We start by updating the event and then insert a first participant if the event is not a draft anymore
"""
@spec update_event(Event.t(), map) :: {:ok, Event.t()} | {:error, Changeset.t()}
def update_event(%Event{} = old_event, attrs) do
def update_event(%Event{draft: old_draft} = old_event, attrs) do
with %Changeset{changes: changes} = changeset <-
Event.update_changeset(Repo.preload(old_event, :tags), attrs),
{:ok, %{update: %Event{} = new_event}} <-
@@ -301,7 +303,7 @@ defmodule Mobilizon.Events do
changeset
)
|> Multi.run(:write, fn _repo, %{update: %Event{draft: draft} = event} ->
with {:is_draft, false} <- {:is_draft, draft},
with {:was_draft, true} <- {:was_draft, old_draft == true && draft == false},
{:ok, %Participant{} = participant} <-
create_participant(
%{
@@ -313,7 +315,7 @@ defmodule Mobilizon.Events do
) do
{:ok, participant}
else
{:is_draft, true} -> {:ok, nil}
{:was_draft, false} -> {:ok, nil}
err -> err
end
end)
@@ -326,7 +328,8 @@ defmodule Mobilizon.Events do
changes
)
Task.start(fn -> Search.update_search_event(new_event) end)
unless new_event.draft,
do: BuildSearchWorker.enqueue(:update_search_event, %{"event_id" => new_event.id})
{:ok, Repo.preload(new_event, @event_preloads)}
end
@@ -473,6 +476,16 @@ defmodule Mobilizon.Events do
|> Repo.one()
end
@doc """
Gets a tag by its title.
"""
@spec get_tag_by_title(String.t()) :: Tag.t() | nil
def get_tag_by_title(slug) do
slug
|> tag_by_title_query()
|> Repo.one()
end
@doc """
Gets an existing tag or creates the new one.
"""
@@ -1351,6 +1364,11 @@ defmodule Mobilizon.Events do
from(t in Tag, where: t.slug == ^slug)
end
@spec tag_by_title_query(String.t()) :: Ecto.Query.t()
defp tag_by_title_query(title) do
from(t in Tag, where: t.title == ^title, limit: 1)
end
@spec tags_for_event_query(integer) :: Ecto.Query.t()
defp tags_for_event_query(event_id) do
from(

View File

@@ -4,6 +4,7 @@ defmodule Mobilizon.Service.ActivityPub.Converter.Utils do
"""
alias Mobilizon.Actors.Actor
alias Mobilizon.Events
alias Mobilizon.Events.Tag
alias Mobilizon.Mention
alias Mobilizon.Service.ActivityPub
@@ -66,7 +67,7 @@ defmodule Mobilizon.Service.ActivityPub.Converter.Utils do
defp fetch_tag(tag, acc) when is_map(tag) do
case tag["type"] do
"Hashtag" ->
acc ++ [%{title: tag}]
acc ++ [existing_tag_or_data(tag["name"])]
_err ->
acc
@@ -74,7 +75,18 @@ defmodule Mobilizon.Service.ActivityPub.Converter.Utils do
end
defp fetch_tag(tag, acc) when is_bitstring(tag) do
acc ++ [%{title: tag}]
acc ++ [existing_tag_or_data(tag)]
end
defp existing_tag_or_data("#" <> tag_title) do
existing_tag_or_data(tag_title)
end
defp existing_tag_or_data(tag_title) do
case Events.get_tag_by_title(tag_title) do
%Tag{} = tag -> %{title: tag.title, id: tag.id}
nil -> %{title: tag_title}
end
end
@spec create_mention(map(), list()) :: list()

View File

@@ -1,12 +1,28 @@
defmodule Mobilizon.Service.Search do
defmodule Mobilizon.Service.Workers.BuildSearchWorker do
@moduledoc """
Module to handle search service
Worker to build search results
"""
alias Mobilizon.Events
alias Mobilizon.Events.Event
alias Mobilizon.Storage.Repo
alias Ecto.Adapters.SQL
use Mobilizon.Service.Workers.WorkerHelper, queue: "search"
@impl Oban.Worker
def perform(%{"op" => "insert_search_event", "event_id" => event_id}, _job) do
with {:ok, %Event{} = event} <- Events.get_event_with_preload(event_id) do
insert_search_event(event)
end
end
def perform(%{"op" => "update_search_event", "event_id" => event_id}, _job) do
with {:ok, %Event{} = event} <- Events.get_event_with_preload(event_id) do
update_search_event(event)
end
end
def insert_search_event(%Event{} = event) do
SQL.query(
Repo,

View File

@@ -0,0 +1,50 @@
# Portions of this file are derived from Pleroma:
# Copyright © 2017-2019 Pleroma Authors <https://pleroma.social>
# SPDX-License-Identifier: AGPL-3.0-only
# Upstream: https://git.pleroma.social/pleroma/pleroma/blob/develop/lib/pleroma/workers/worker_helper.ex
defmodule Mobilizon.Service.Workers.WorkerHelper do
@moduledoc """
Tools to ease dealing with workers
"""
alias Mobilizon.Config
alias Mobilizon.Service.Workers.WorkerHelper
def worker_args(queue) do
case Config.get([:workers, :retries, queue]) do
nil -> []
max_attempts -> [max_attempts: max_attempts]
end
end
def sidekiq_backoff(attempt, pow \\ 4, base_backoff \\ 15) do
backoff =
:math.pow(attempt, pow) +
base_backoff +
:rand.uniform(2 * base_backoff) * attempt
trunc(backoff)
end
defmacro __using__(opts) do
caller_module = __CALLER__.module
queue = Keyword.fetch!(opts, :queue)
quote do
# Note: `max_attempts` is intended to be overridden in `new/2` call
use Oban.Worker,
queue: unquote(queue),
max_attempts: 1
def enqueue(operation, params, worker_args \\ []) do
params = Map.merge(%{"op" => operation}, params)
queue_atom = String.to_existing_atom(unquote(queue))
worker_args = worker_args ++ WorkerHelper.worker_args(queue_atom)
unquote(caller_module)
|> apply(:new, [params, worker_args])
|> Mobilizon.Storage.Repo.insert()
end
end
end
end