Signed-off-by: Thomas Citharel <tcit@tcit.fr>
This commit is contained in:
Thomas Citharel
2018-05-17 11:32:23 +02:00
parent 2c1abe5e19
commit e14007bac5
45 changed files with 2916 additions and 111 deletions

View File

@@ -0,0 +1,276 @@
defmodule Eventos.Service.ActivityPub do
alias Eventos.Events
alias Eventos.Events.Event
alias Eventos.Service.ActivityPub.Transmogrifier
alias Eventos.Service.WebFinger
alias Eventos.Activity
alias Eventos.Accounts
alias Eventos.Accounts.Account
alias Eventos.Service.Federator
import Logger
import Eventos.Service.ActivityPub.Utils
def get_recipients(data) do
(data["to"] || []) ++ (data["cc"] || [])
end
def insert(map, local \\ true) when is_map(map) do
with map <- lazy_put_activity_defaults(map),
:ok <- insert_full_object(map) do
map = Map.put(map, "id", Ecto.UUID.generate())
activity = %Activity{
data: map,
local: local,
actor: map["actor"],
recipients: get_recipients(map)
}
# Notification.create_notifications(activity)
#stream_out(activity)
{:ok, activity}
else
%Activity{} = activity -> {:ok, activity}
error -> {:error, error}
end
end
def fetch_event_from_url(url) do
if object = Events.get_event_by_url!(url) do
{:ok, object}
else
Logger.info("Fetching #{url} via AP")
with true <- String.starts_with?(url, "http"),
{:ok, %{body: body, status_code: code}} when code in 200..299 <-
HTTPoison.get(
url,
[Accept: "application/activity+json"],
follow_redirect: true,
timeout: 10000,
recv_timeout: 20000
),
{:ok, data} <- Jason.decode(body),
nil <- Events.get_event_by_url!(data["id"]),
params <- %{
"type" => "Create",
"to" => data["to"],
"cc" => data["cc"],
"actor" => data["attributedTo"],
"object" => data
},
{:ok, activity} <- Transmogrifier.handle_incoming(params) do
{:ok, Events.get_event_by_url!(activity.data["object"]["id"])}
else
object = %Event{} -> {:ok, object}
e -> e
end
end
end
def create(%{to: to, actor: actor, context: context, object: object} = params) do
additional = params[:additional] || %{}
# only accept false as false value
local = !(params[:local] == false)
published = params[:published]
with create_data <-
make_create_data(
%{to: to, actor: actor, published: published, context: context, object: object},
additional
),
{:ok, activity} <- insert(create_data, local),
:ok <- maybe_federate(activity) do
# {:ok, actor} <- Accounts.increase_event_count(actor) do
{:ok, activity}
end
end
def accept(%{to: to, actor: actor, object: object} = params) do
# only accept false as false value
local = !(params[:local] == false)
with data <- %{"to" => to, "type" => "Accept", "actor" => actor, "object" => object},
{:ok, activity} <- insert(data, local),
:ok <- maybe_federate(activity) do
{:ok, activity}
end
end
def update(%{to: to, cc: cc, actor: actor, object: object} = params) do
# only accept false as false value
local = !(params[:local] == false)
with data <- %{
"to" => to,
"cc" => cc,
"type" => "Update",
"actor" => actor,
"object" => object
},
{:ok, activity} <- insert(data, local),
:ok <- maybe_federate(activity) do
{:ok, activity}
end
end
def follow(follower, followed, activity_id \\ nil, local \\ true) do
with data <- make_follow_data(follower, followed, activity_id),
{:ok, activity} <- insert(data, local),
:ok <- maybe_federate(activity) do
{:ok, activity}
end
end
def delete(%Event{url: url, organizer_account: account} = event, local \\ true) do
data = %{
"type" => "Delete",
"actor" => account.url,
"object" => url,
"to" => [account.url <> "/followers", "https://www.w3.org/ns/activitystreams#Public"]
}
with Events.delete_event(event),
{:ok, activity} <- insert(data, local),
:ok <- maybe_federate(activity)
do
{:ok, activity}
end
end
def create_public_activities(%Account{} = account) do
end
def make_account_from_url(url) do
with {:ok, data} <- fetch_and_prepare_user_from_url(url) do
Accounts.insert_or_update_account(data)
else
e ->
Logger.error("Failed to make account from url")
Logger.error(inspect e)
{:error, e}
end
end
def make_account_from_nickname(nickname) do
with {:ok, %{"url" => url}} when not is_nil(url) <- WebFinger.finger(nickname) do
make_account_from_url(url)
else
_e -> {:error, "No ActivityPub URL found in WebFinger"}
end
end
def publish(actor, activity) do
# followers =
# if actor.follower_address in activity.recipients do
# {:ok, followers} = User.get_followers(actor)
# followers |> Enum.filter(&(!&1.local))
# else
# []
# end
followers = ["http://localhost:3000/users/tcit/inbox"]
remote_inboxes = followers
{:ok, data} = Transmogrifier.prepare_outgoing(activity.data)
json = Jason.encode!(data)
Enum.each(remote_inboxes, fn inbox ->
Federator.enqueue(:publish_single_ap, %{
inbox: inbox,
json: json,
actor: actor,
id: activity.data["id"]
})
end)
end
def publish_one(%{inbox: inbox, json: json, actor: actor, id: id}) do
Logger.info("Federating #{id} to #{inbox}")
host = URI.parse(inbox).host
signature =
Eventos.Service.HTTPSignatures.sign(actor, %{host: host, "content-length": byte_size(json)})
Logger.debug("signature")
Logger.debug(inspect signature)
{:ok, response} = HTTPoison.post(
inbox,
json,
[{"Content-Type", "application/activity+json"}, {"signature", signature}],
hackney: [pool: :default]
)
Logger.debug(inspect response)
end
def fetch_and_prepare_user_from_url(url) do
Logger.debug("Fetching and preparing user from url")
with {:ok, %{status_code: 200, body: body}} <-
HTTPoison.get(url, [Accept: "application/activity+json"], [follow_redirect: true]),
{:ok, data} <- Jason.decode(body) do
user_data_from_user_object(data)
else
e -> Logger.error("Could not decode user at fetch #{url}, #{inspect(e)}")
end
end
def user_data_from_user_object(data) do
avatar =
data["icon"]["url"] &&
%{
"type" => "Image",
"url" => [%{"href" => data["icon"]["url"]}]
}
banner =
data["image"]["url"] &&
%{
"type" => "Image",
"url" => [%{"href" => data["image"]["url"]}]
}
user_data = %{
url: data["id"],
info: %{
"ap_enabled" => true,
"source_data" => data,
"banner" => banner
},
avatar: avatar,
username: "#{data["preferredUsername"]}@#{URI.parse(data["id"]).host}",
display_name: data["name"],
follower_address: data["followers"],
description: data["summary"],
public_key: data["publicKey"]["publicKeyPem"],
}
{:ok, user_data}
end
@spec fetch_public_activities_for_account(Account.t, integer(), integer()) :: list()
def fetch_public_activities_for_account(%Account{} = account, page \\ 10, limit \\ 1) do
{:ok, events, total} = Events.get_events_for_account(account, page, limit)
activities = Enum.map(events, fn event ->
{:ok, activity} = event_to_activity(event)
activity
end)
{activities, total}
end
defp event_to_activity(%Event{} = event) do
activity = %Activity{
data: event,
local: true,
actor: event.organizer_account.url,
recipients: ["https://www.w3.org/ns/activitystreams#Public"]
}
# Notification.create_notifications(activity)
#stream_out(activity)
{:ok, activity}
end
end

View File

@@ -0,0 +1,482 @@
defmodule Eventos.Service.ActivityPub.Transmogrifier do
@moduledoc """
A module to handle coding from internal to wire ActivityPub and back.
"""
alias Eventos.Accounts.Account
alias Eventos.Accounts
alias Eventos.Events.Event
alias Eventos.Service.ActivityPub
import Ecto.Query
require Logger
@doc """
Modifies an incoming AP object (mastodon format) to our internal format.
"""
def fix_object(object) do
object
|> Map.put("actor", object["attributedTo"])
|> fix_attachments
|> fix_context
#|> fix_in_reply_to
|> fix_emoji
|> fix_tag
end
# def fix_in_reply_to(%{"inReplyTo" => in_reply_to_id} = object)
# when not is_nil(in_reply_to_id) do
# case ActivityPub.fetch_object_from_id(in_reply_to_id) do
# {:ok, replied_object} ->
# activity = Activity.get_create_activity_by_object_ap_id(replied_object.data["id"])
#
# object
# |> Map.put("inReplyTo", replied_object.data["id"])
# |> Map.put("inReplyToAtomUri", object["inReplyToAtomUri"] || in_reply_to_id)
# |> Map.put("inReplyToStatusId", activity.id)
# |> Map.put("conversation", replied_object.data["context"] || object["conversation"])
# |> Map.put("context", replied_object.data["context"] || object["conversation"])
#
# e ->
# Logger.error("Couldn't fetch #{object["inReplyTo"]} #{inspect(e)}")
# object
# end
# end
def fix_in_reply_to(object), do: object
def fix_context(object) do
object
|> Map.put("context", object["conversation"])
end
def fix_attachments(object) do
attachments =
(object["attachment"] || [])
|> Enum.map(fn data ->
url = [%{"type" => "Link", "mediaType" => data["mediaType"], "href" => data["url"]}]
Map.put(data, "url", url)
end)
object
|> Map.put("attachment", attachments)
end
def fix_emoji(object) do
tags = object["tag"] || []
emoji = tags |> Enum.filter(fn data -> data["type"] == "Emoji" and data["icon"] end)
emoji =
emoji
|> Enum.reduce(%{}, fn data, mapping ->
name = data["name"]
if String.starts_with?(name, ":") do
name = name |> String.slice(1..-2)
end
mapping |> Map.put(name, data["icon"]["url"])
end)
# we merge mastodon and pleroma emoji into a single mapping, to allow for both wire formats
emoji = Map.merge(object["emoji"] || %{}, emoji)
object
|> Map.put("emoji", emoji)
end
def fix_tag(object) do
tags =
(object["tag"] || [])
|> Enum.filter(fn data -> data["type"] == "Hashtag" and data["name"] end)
|> Enum.map(fn data -> String.slice(data["name"], 1..-1) end)
combined = (object["tag"] || []) ++ tags
object
|> Map.put("tag", combined)
end
# TODO: validate those with a Ecto scheme
# - tags
# - emoji
def handle_incoming(%{"type" => "Create", "object" => %{"type" => "Note"} = object} = data) do
with %Account{} = account <- Account.get_or_fetch_by_url(data["actor"]) do
object = fix_object(data["object"])
params = %{
to: data["to"],
object: object,
actor: account,
context: object["conversation"],
local: false,
published: data["published"],
additional:
Map.take(data, [
"cc",
"id"
])
}
ActivityPub.create(params)
end
end
def handle_incoming(
%{"type" => "Follow", "object" => followed, "actor" => follower, "id" => id} = data
) do
with %Account{} = followed <- Accounts.get_account_by_url(followed),
%Account{} = follower <- Accounts.get_or_fetch_by_url(follower),
{:ok, activity} <- ActivityPub.follow(follower, followed, id, false) do
ActivityPub.accept(%{to: [follower.url], actor: followed.url, object: data, local: true})
#Accounts.follow(follower, followed)
{:ok, activity}
else
_e -> :error
end
end
#
# def handle_incoming(
# %{"type" => "Like", "object" => object_id, "actor" => actor, "id" => id} = data
# ) do
# with %User{} = actor <- User.get_or_fetch_by_ap_id(actor),
# {:ok, object} <-
# get_obj_helper(object_id) || ActivityPub.fetch_object_from_id(object_id),
# {:ok, activity, object} <- ActivityPub.like(actor, object, id, false) do
# {:ok, activity}
# else
# _e -> :error
# end
# end
#
# def handle_incoming(
# %{"type" => "Announce", "object" => object_id, "actor" => actor, "id" => id} = data
# ) do
# with %User{} = actor <- User.get_or_fetch_by_ap_id(actor),
# {:ok, object} <-
# get_obj_helper(object_id) || ActivityPub.fetch_object_from_id(object_id),
# {:ok, activity, object} <- ActivityPub.announce(actor, object, id, false) do
# {:ok, activity}
# else
# _e -> :error
# end
# end
#
# def handle_incoming(
# %{"type" => "Update", "object" => %{"type" => "Person"} = object, "actor" => actor_id} =
# data
# ) do
# with %User{ap_id: ^actor_id} = actor <- User.get_by_ap_id(object["id"]) do
# {:ok, new_user_data} = ActivityPub.user_data_from_user_object(object)
#
# banner = new_user_data[:info]["banner"]
#
# update_data =
# new_user_data
# |> Map.take([:name, :bio, :avatar])
# |> Map.put(:info, Map.merge(actor.info, %{"banner" => banner}))
#
# actor
# |> User.upgrade_changeset(update_data)
# |> User.update_and_set_cache()
#
# ActivityPub.update(%{
# local: false,
# to: data["to"] || [],
# cc: data["cc"] || [],
# object: object,
# actor: actor_id
# })
# else
# e ->
# Logger.error(e)
# :error
# end
# end
#
# # TODO: Make secure.
# def handle_incoming(
# %{"type" => "Delete", "object" => object_id, "actor" => actor, "id" => id} = data
# ) do
# object_id =
# case object_id do
# %{"id" => id} -> id
# id -> id
# end
#
# with %User{} = actor <- User.get_or_fetch_by_ap_id(actor),
# {:ok, object} <-
# get_obj_helper(object_id) || ActivityPub.fetch_object_from_id(object_id),
# {:ok, activity} <- ActivityPub.delete(object, false) do
# {:ok, activity}
# else
# e -> :error
# end
# end
#
# # TODO
# # Accept
# # Undo
#
# def handle_incoming(_), do: :error
#
# def get_obj_helper(id) do
# if object = Object.get_by_ap_id(id), do: {:ok, object}, else: nil
# end
#
# def set_reply_to_uri(%{"inReplyTo" => inReplyTo} = object) do
# with false <- String.starts_with?(inReplyTo, "http"),
# {:ok, %{data: replied_to_object}} <- get_obj_helper(inReplyTo) do
# Map.put(object, "inReplyTo", replied_to_object["external_url"] || inReplyTo)
# else
# _e -> object
# end
# end
#
# def set_reply_to_uri(obj), do: obj
#
# # Prepares the object of an outgoing create activity.
# def prepare_object(object) do
# object
# |> set_sensitive
# |> add_hashtags
# |> add_mention_tags
# |> add_emoji_tags
# |> add_attributed_to
# |> prepare_attachments
# |> set_conversation
# |> set_reply_to_uri
# end
@doc
"""
internal -> Mastodon
"""
def prepare_outgoing(%{"type" => "Create", "object" => %{"type" => "Note"} = object} = data) do
object =
object
#|> prepare_object
data =
data
|> Map.put("object", object)
|> Map.put("@context", "https://www.w3.org/ns/activitystreams")
{:ok, data}
end
def prepare_outgoing(%{"type" => type} = data) do
data =
data
#|> maybe_fix_object_url
|> Map.put("@context", "https://www.w3.org/ns/activitystreams")
{:ok, data}
end
def prepare_outgoing(%Event{} = event) do
event =
event
|> Map.from_struct
|> Map.drop([:"__meta__"])
|> Map.put(:"@context", "https://www.w3.org/ns/activitystreams")
{:ok, event}
end
#
# def maybe_fix_object_url(data) do
# if is_binary(data["object"]) and not String.starts_with?(data["object"], "http") do
# case ActivityPub.fetch_object_from_id(data["object"]) do
# {:ok, relative_object} ->
# if relative_object.data["external_url"] do
# data =
# data
# |> Map.put("object", relative_object.data["external_url"])
# else
# data
# end
#
# e ->
# Logger.error("Couldn't fetch #{data["object"]} #{inspect(e)}")
# data
# end
# else
# data
# end
# end
#
# def add_hashtags(object) do
# tags =
# (object["tag"] || [])
# |> Enum.map(fn tag ->
# %{
# "href" => Pleroma.Web.Endpoint.url() <> "/tags/#{tag}",
# "name" => "##{tag}",
# "type" => "Hashtag"
# }
# end)
#
# object
# |> Map.put("tag", tags)
# end
#
# def add_mention_tags(object) do
# recipients = object["to"] ++ (object["cc"] || [])
#
# mentions =
# recipients
# |> Enum.map(fn ap_id -> User.get_cached_by_ap_id(ap_id) end)
# |> Enum.filter(& &1)
# |> Enum.map(fn user ->
# %{"type" => "Mention", "href" => user.ap_id, "name" => "@#{user.nickname}"}
# end)
#
# tags = object["tag"] || []
#
# object
# |> Map.put("tag", tags ++ mentions)
# end
#
# # TODO: we should probably send mtime instead of unix epoch time for updated
# def add_emoji_tags(object) do
# tags = object["tag"] || []
# emoji = object["emoji"] || []
#
# out =
# emoji
# |> Enum.map(fn {name, url} ->
# %{
# "icon" => %{"url" => url, "type" => "Image"},
# "name" => ":" <> name <> ":",
# "type" => "Emoji",
# "updated" => "1970-01-01T00:00:00Z",
# "id" => url
# }
# end)
#
# object
# |> Map.put("tag", tags ++ out)
# end
#
# def set_conversation(object) do
# Map.put(object, "conversation", object["context"])
# end
#
# def set_sensitive(object) do
# tags = object["tag"] || []
# Map.put(object, "sensitive", "nsfw" in tags)
# end
#
# def add_attributed_to(object) do
# attributedTo = object["attributedTo"] || object["actor"]
#
# object
# |> Map.put("attributedTo", attributedTo)
# end
#
# def prepare_attachments(object) do
# attachments =
# (object["attachment"] || [])
# |> Enum.map(fn data ->
# [%{"mediaType" => media_type, "href" => href} | _] = data["url"]
# %{"url" => href, "mediaType" => media_type, "name" => data["name"], "type" => "Document"}
# end)
#
# object
# |> Map.put("attachment", attachments)
# end
#
# defp user_upgrade_task(user) do
# old_follower_address = User.ap_followers(user)
#
# q =
# from(
# u in User,
# where: ^old_follower_address in u.following,
# update: [
# set: [
# following:
# fragment(
# "array_replace(?,?,?)",
# u.following,
# ^old_follower_address,
# ^user.follower_address
# )
# ]
# ]
# )
#
# Repo.update_all(q, [])
#
# maybe_retire_websub(user.ap_id)
#
# # Only do this for recent activties, don't go through the whole db.
# # Only look at the last 1000 activities.
# since = (Repo.aggregate(Activity, :max, :id) || 0) - 1_000
#
# q =
# from(
# a in Activity,
# where: ^old_follower_address in a.recipients,
# where: a.id > ^since,
# update: [
# set: [
# recipients:
# fragment(
# "array_replace(?,?,?)",
# a.recipients,
# ^old_follower_address,
# ^user.follower_address
# )
# ]
# ]
# )
#
# Repo.update_all(q, [])
# end
#
# def upgrade_user_from_ap_id(ap_id, async \\ true) do
# with %User{local: false} = user <- User.get_by_ap_id(ap_id),
# {:ok, data} <- ActivityPub.fetch_and_prepare_user_from_ap_id(ap_id) do
# data =
# data
# |> Map.put(:info, Map.merge(user.info, data[:info]))
#
# already_ap = User.ap_enabled?(user)
#
# {:ok, user} =
# User.upgrade_changeset(user, data)
# |> Repo.update()
#
# if !already_ap do
# # This could potentially take a long time, do it in the background
# if async do
# Task.start(fn ->
# user_upgrade_task(user)
# end)
# else
# user_upgrade_task(user)
# end
# end
#
# {:ok, user}
# else
# e -> e
# end
# end
#
# def maybe_retire_websub(ap_id) do
# # some sanity checks
# if is_binary(ap_id) && String.length(ap_id) > 8 do
# q =
# from(
# ws in Pleroma.Web.Websub.WebsubClientSubscription,
# where: fragment("? like ?", ws.topic, ^"#{ap_id}%")
# )
#
# Repo.delete_all(q)
# end
# end
end

View File

@@ -0,0 +1,304 @@
defmodule Eventos.Service.ActivityPub.Utils do
alias Eventos.Repo
alias Eventos.Accounts
alias Eventos.Accounts.Account
alias Eventos.Events.Event
alias Eventos.Events
alias Eventos.Activity
alias EventosWeb
alias EventosWeb.Router.Helpers
alias EventosWeb.Endpoint
alias Ecto.{Changeset, UUID}
import Ecto.Query
def make_json_ld_header do
%{
"@context" => [
"https://www.w3.org/ns/activitystreams",
%{
"manuallyApprovesFollowers" => "as:manuallyApprovesFollowers",
"sensitive" => "as:sensitive",
"Hashtag" => "as:Hashtag",
"toot" => "http://joinmastodon.org/ns#",
"Emoji" => "toot:Emoji"
}
]
}
end
def make_date do
DateTime.utc_now() |> DateTime.to_iso8601()
end
def generate_activity_id do
generate_id("activities")
end
def generate_context_id do
generate_id("contexts")
end
# def generate_object_id do
# Helpers.o_status_url(Endpoint, :object, UUID.generate())
# end
def generate_id(type) do
"#{EventosWeb.Endpoint.url()}/#{type}/#{UUID.generate()}"
end
# def create_context(context) do
# context = context || generate_id("contexts")
# changeset = Object.context_mapping(context)
#
# case Repo.insert(changeset) do
# {:ok, object} ->
# object
#
# # This should be solved by an upsert, but it seems ecto
# # has problems accessing the constraint inside the jsonb.
# {:error, _} ->
# Events.get_cached_by_url(context)
# end
# end
@doc """
Enqueues an activity for federation if it's local
"""
def maybe_federate(%Activity{local: true} = activity) do
priority =
case activity.data["type"] do
"Delete" -> 10
"Create" -> 1
_ -> 5
end
Eventos.Service.Federator.enqueue(:publish, activity, priority)
:ok
end
def maybe_federate(_), do: :ok
@doc """
Adds an id and a published data if they aren't there,
also adds it to an included object
"""
def lazy_put_activity_defaults(map) do
# %{data: %{"id" => context}, id: context_id} = create_context(map["context"])
#
# map =
# map
# |> Map.put_new_lazy("id", &generate_activity_id/0)
# |> Map.put_new_lazy("published", &make_date/0)
# |> Map.put_new("context", context)
# |> Map.put_new("context_id", context_id)
if is_map(map["object"]) do
object = lazy_put_object_defaults(map["object"], map)
%{map | "object" => object}
else
map
end
end
@doc """
Adds an id and published date if they aren't there.
"""
def lazy_put_object_defaults(map, activity \\ %{}) do
map
#|> Map.put_new_lazy("id", &generate_object_id/0)
|> Map.put_new_lazy("published", &make_date/0)
|> Map.put_new("context", activity["context"])
|> Map.put_new("context_id", activity["context_id"])
end
@doc """
Inserts a full object if it is contained in an activity.
"""
def insert_full_object(%{"object" => %{"type" => type} = object_data})
when is_map(object_data) and type == "Event" do
with {:ok, _} <- Events.create_event(object_data) do
:ok
end
end
@doc """
Inserts a full object if it is contained in an activity.
"""
def insert_full_object(%{"object" => %{"type" => type} = object_data})
when is_map(object_data) and type == "Note" do
account = Accounts.get_account_by_url(object_data["actor"])
data = %{"text" => object_data["content"], "url" => object_data["url"], "account_id" => account.id, "in_reply_to_comment_id" => object_data["inReplyTo"]}
with {:ok, _} <- Events.create_comment(data) do
:ok
end
end
def insert_full_object(_), do: :ok
# def update_object_in_activities(%{data: %{"id" => id}} = object) do
# # TODO
# # Update activities that already had this. Could be done in a seperate process.
# # Alternatively, just don't do this and fetch the current object each time. Most
# # could probably be taken from cache.
# relevant_activities = Activity.all_by_object_url(id)
#
# Enum.map(relevant_activities, fn activity ->
# new_activity_data = activity.data |> Map.put("object", object.data)
# changeset = Changeset.change(activity, data: new_activity_data)
# Repo.update(changeset)
# end)
# end
#### Like-related helpers
# @doc """
# Returns an existing like if a user already liked an object
# """
# def get_existing_like(actor, %{data: %{"id" => id}}) do
# query =
# from(
# activity in Activity,
# where: fragment("(?)->>'actor' = ?", activity.data, ^actor),
# # this is to use the index
# where:
# fragment(
# "coalesce((?)->'object'->>'id', (?)->>'object') = ?",
# activity.data,
# activity.data,
# ^id
# ),
# where: fragment("(?)->>'type' = 'Like'", activity.data)
# )
#
# Repo.one(query)
# end
def make_like_data(%Account{url: url} = actor, %{data: %{"id" => id}} = object, activity_id) do
data = %{
"type" => "Like",
"actor" => url,
"object" => id,
"to" => [actor.follower_address, object.data["actor"]],
"cc" => ["https://www.w3.org/ns/activitystreams#Public"],
"context" => object.data["context"]
}
if activity_id, do: Map.put(data, "id", activity_id), else: data
end
def update_element_in_object(property, element, object) do
with new_data <-
object.data
|> Map.put("#{property}_count", length(element))
|> Map.put("#{property}s", element),
changeset <- Changeset.change(object, data: new_data),
{:ok, object} <- Repo.update(changeset) do
{:ok, object}
end
end
# def update_likes_in_object(likes, object) do
# update_element_in_object("like", likes, object)
# end
#
# def add_like_to_object(%Activity{data: %{"actor" => actor}}, object) do
# with likes <- [actor | object.data["likes"] || []] |> Enum.uniq() do
# update_likes_in_object(likes, object)
# end
# end
#
# def remove_like_from_object(%Activity{data: %{"actor" => actor}}, object) do
# with likes <- (object.data["likes"] || []) |> List.delete(actor) do
# update_likes_in_object(likes, object)
# end
# end
#### Follow-related helpers
@doc """
Makes a follow activity data for the given follower and followed
"""
def make_follow_data(%Account{url: follower_id}, %Account{url: followed_id}, activity_id) do
data = %{
"type" => "Follow",
"actor" => follower_id,
"to" => [followed_id],
"cc" => ["https://www.w3.org/ns/activitystreams#Public"],
"object" => followed_id
}
if activity_id, do: Map.put(data, "id", activity_id), else: data
end
# def fetch_latest_follow(%Account{url: follower_id}, %Account{url: followed_id}) do
# query =
# from(
# activity in Activity,
# where:
# fragment(
# "? @> ?",
# activity.data,
# ^%{type: "Follow", actor: follower_id, object: followed_id}
# ),
# order_by: [desc: :id],
# limit: 1
# )
#
# Repo.one(query)
# end
#### Announce-related helpers
@doc """
Make announce activity data for the given actor and object
"""
def make_announce_data(
%Account{url: url} = user,
%Event{id: id} = object,
activity_id
) do
data = %{
"type" => "Announce",
"actor" => url,
"object" => id,
"to" => [user.follower_address, object.data["actor"]],
"cc" => ["https://www.w3.org/ns/activitystreams#Public"],
"context" => object.data["context"]
}
if activity_id, do: Map.put(data, "id", activity_id), else: data
end
def add_announce_to_object(%Activity{data: %{"actor" => actor}}, object) do
with announcements <- [actor | object.data["announcements"] || []] |> Enum.uniq() do
update_element_in_object("announcement", announcements, object)
end
end
#### Unfollow-related helpers
def make_unfollow_data(follower, followed, follow_activity) do
%{
"type" => "Undo",
"actor" => follower.url,
"to" => [followed.url],
"object" => follow_activity.data["id"]
}
end
#### Create-related helpers
def make_create_data(params, additional) do
published = params.published || make_date()
%{
"type" => "Create",
"to" => params.to |> Enum.uniq(),
"actor" => params.actor.url,
"object" => params.object,
"published" => published,
"context" => params.context
}
|> Map.merge(additional)
end
end

126
lib/service/federator.ex Normal file
View File

@@ -0,0 +1,126 @@
defmodule Eventos.Service.Federator do
use GenServer
alias Eventos.Accounts
alias Eventos.Activity
alias Eventos.Service.ActivityPub
alias Eventos.Service.ActivityPub.Transmogrifier
require Logger
@max_jobs 20
def init(args) do
{:ok, args}
end
def start_link do
spawn(fn ->
# 1 minute
Process.sleep(1000 * 60 * 1)
end)
GenServer.start_link(
__MODULE__,
%{
in: {:sets.new(), []},
out: {:sets.new(), []}
},
name: __MODULE__
)
end
def handle(:publish, activity) do
Logger.debug(inspect activity)
Logger.debug(fn -> "Running publish for #{activity.data["id"]}" end)
with actor when not is_nil(actor) <- Accounts.get_account_by_url(activity.data["actor"]) do
Logger.info(fn -> "Sending #{activity.data["id"]} out via AP" end)
ActivityPub.publish(actor, activity)
end
end
def handle(:incoming_ap_doc, params) do
Logger.info("Handling incoming AP activity")
Logger.debug(inspect params)
with {:ok, _activity} <- Transmogrifier.handle_incoming(params) do
else
%Activity{} ->
Logger.info("Already had #{params["id"]}")
_e ->
# Just drop those for now
Logger.info("Unhandled activity")
Logger.info(Poison.encode!(params, pretty: 2))
end
end
def handle(:publish_single_ap, params) do
ActivityPub.publish_one(params)
end
def handle(type, _) do
Logger.debug(fn -> "Unknown task: #{type}" end)
{:error, "Don't know what to do with this"}
end
def enqueue(type, payload, priority \\ 1) do
Logger.debug("enqueue")
if Mix.env() == :test do
handle(type, payload)
else
GenServer.cast(__MODULE__, {:enqueue, type, payload, priority})
end
end
def maybe_start_job(running_jobs, queue) do
if :sets.size(running_jobs) < @max_jobs && queue != [] do
{{type, payload}, queue} = queue_pop(queue)
{:ok, pid} = Task.start(fn -> handle(type, payload) end)
mref = Process.monitor(pid)
{:sets.add_element(mref, running_jobs), queue}
else
{running_jobs, queue}
end
end
def handle_cast({:enqueue, type, payload, _priority}, state)
when type in [:incoming_doc, :incoming_ap_doc] do
%{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
i_queue = enqueue_sorted(i_queue, {type, payload}, 1)
{i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue)
{:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
end
def handle_cast({:enqueue, type, payload, _priority}, state) do
%{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
o_queue = enqueue_sorted(o_queue, {type, payload}, 1)
{o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue)
{:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
end
def handle_cast(m, state) do
IO.inspect("Unknown: #{inspect(m)}, #{inspect(state)}")
{:noreply, state}
end
def handle_info({:DOWN, ref, :process, _pid, _reason}, state) do
%{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}} = state
i_running_jobs = :sets.del_element(ref, i_running_jobs)
o_running_jobs = :sets.del_element(ref, o_running_jobs)
{i_running_jobs, i_queue} = maybe_start_job(i_running_jobs, i_queue)
{o_running_jobs, o_queue} = maybe_start_job(o_running_jobs, o_queue)
{:noreply, %{in: {i_running_jobs, i_queue}, out: {o_running_jobs, o_queue}}}
end
def enqueue_sorted(queue, element, priority) do
[%{item: element, priority: priority} | queue]
|> Enum.sort_by(fn %{priority: priority} -> priority end)
end
def queue_pop([%{item: element} | queue]) do
{element, queue}
end
end

View File

@@ -0,0 +1,112 @@
# https://tools.ietf.org/html/draft-cavage-http-signatures-08
defmodule Eventos.Service.HTTPSignatures do
alias Eventos.Accounts.Account
alias Eventos.Service.ActivityPub
require Logger
def split_signature(sig) do
default = %{"headers" => "date"}
sig =
sig
|> String.trim()
|> String.split(",")
|> Enum.reduce(default, fn part, acc ->
[key | rest] = String.split(part, "=")
value = Enum.join(rest, "=")
Map.put(acc, key, String.trim(value, "\""))
end)
Map.put(sig, "headers", String.split(sig["headers"], ~r/\s/))
end
def validate(headers, signature, public_key) do
sigstring = build_signing_string(headers, signature["headers"])
Logger.debug("Signature: #{signature["signature"]}")
Logger.debug("Sigstring: #{sigstring}")
{:ok, sig} = Base.decode64(signature["signature"])
Logger.debug(inspect sig)
Logger.debug(inspect public_key)
case ExPublicKey.verify(sigstring, sig, public_key) do
{:ok, sig_valid} ->
sig_valid
{:error, err} ->
Logger.error(err)
false
end
end
def validate_conn(conn) do
# TODO: How to get the right key and see if it is actually valid for that request.
# For now, fetch the key for the actor.
with actor_id <- conn.params["actor"],
{:ok, public_key} <- Account.get_public_key_for_url(actor_id) do
case HTTPSign.verify(conn, public_key) do
{:ok, conn} ->
true
_ ->
Logger.debug("Could not validate, re-fetching user and trying one more time")
# Fetch user anew and try one more time
with actor_id <- conn.params["actor"],
{:ok, _user} <- ActivityPub.make_account_from_url(actor_id),
{:ok, public_key} <- Account.get_public_key_for_url(actor_id) do
case HTTPSign.verify(conn, public_key) do
{:ok, conn} ->
true
{:error, :forbidden} ->
false
end
end
end
else
e ->
Logger.debug("Could not public key!")
Logger.debug(inspect e)
false
end
end
# def validate_conn(conn, public_key) do
# headers = Enum.into(conn.req_headers, %{})
# signature = split_signature(headers["signature"])
# validate(headers, signature, public_key)
# end
def build_signing_string(headers, used_headers) do
used_headers
|> Enum.map(fn header -> "#{header}: #{headers[header]}" end)
|> Enum.join("\n")
end
def sign(account, headers) do
sigstring = build_signing_string(headers, Map.keys(headers))
{:ok, private_key} = Account.get_private_key_for_account(account)
Logger.debug("private_key")
Logger.debug(inspect private_key)
Logger.debug("sigstring")
Logger.debug(inspect sigstring)
{:ok, signature} = HTTPSign.Crypto.sign(:rsa, sigstring, private_key)
Logger.debug(inspect signature)
signature = Base.encode64(signature)
sign = [
keyId: account.url <> "#main-key",
algorithm: "rsa-sha256",
headers: Map.keys(headers) |> Enum.join(" "),
signature: signature
]
|> Enum.map(fn {k, v} -> "#{k}=\"#{v}\"" end)
|> Enum.join(",")
Logger.debug("sign")
Logger.debug(inspect sign)
{:ok, public_key} = Account.get_public_key_for_account(account)
Logger.debug("inspect split signature inside sign")
Logger.debug(inspect split_signature(sign))
Logger.debug(inspect validate(headers, split_signature(sign), public_key))
sign
end
end

View File

@@ -0,0 +1,94 @@
defmodule Eventos.Service.WebFinger do
alias Eventos.Accounts
alias Eventos.Service.XmlBuilder
alias Eventos.Repo
require Jason
require Logger
def host_meta do
base_url = EventosWeb.Endpoint.url()
{
:XRD,
%{xmlns: "http://docs.oasis-open.org/ns/xri/xrd-1.0"},
{
:Link,
%{
rel: "lrdd",
type: "application/xrd+xml",
template: "#{base_url}/.well-known/webfinger?resource={uri}"
}
}
}
|> XmlBuilder.to_doc()
end
def webfinger(resource, "JSON") do
host = EventosWeb.Endpoint.host()
regex = ~r/(acct:)?(?<username>\w+)@#{host}/
with %{"username" => username} <- Regex.named_captures(regex, resource) do
user = Accounts.get_account_by_username(username)
{:ok, represent_user(user, "JSON")}
else
_e ->
with user when not is_nil(user) <- Accounts.get_account_by_url(resource) do
{:ok, represent_user(user, "JSON")}
else
_e ->
{:error, "Couldn't find user"}
end
end
end
def represent_user(user, "JSON") do
%{
"subject" => "acct:#{user.username}@#{EventosWeb.Endpoint.host() <> ":4001"}",
"aliases" => [user.url],
"links" => [
%{"rel" => "self", "type" => "application/activity+json", "href" => user.url},
]
}
end
defp webfinger_from_json(doc) do
data =
Enum.reduce(doc["links"], %{"subject" => doc["subject"]}, fn link, data ->
case {link["type"], link["rel"]} do
{"application/activity+json", "self"} ->
Map.put(data, "url", link["href"])
_ ->
Logger.debug("Unhandled type: #{inspect(link["type"])}")
data
end
end)
{:ok, data}
end
def finger(account) do
account = String.trim_leading(account, "@")
domain =
with [_name, domain] <- String.split(account, "@") do
domain
else
_e ->
URI.parse(account).host
end
address = "http://#{domain}/.well-known/webfinger?resource=acct:#{account}"
with response <- HTTPoison.get(address, [Accept: "application/json"],follow_redirect: true),
{:ok, %{status_code: status_code, body: body}} when status_code in 200..299 <- response do
{:ok, doc} = Jason.decode(body)
webfinger_from_json(doc)
else
e ->
Logger.debug(fn -> "Couldn't finger #{account}" end)
Logger.debug(fn -> inspect(e) end)
{:error, e}
end
end
end

View File

@@ -0,0 +1,44 @@
defmodule Eventos.Service.XmlBuilder do
def to_xml({tag, attributes, content}) do
open_tag = make_open_tag(tag, attributes)
content_xml = to_xml(content)
"<#{open_tag}>#{content_xml}</#{tag}>"
end
def to_xml({tag, %{} = attributes}) do
open_tag = make_open_tag(tag, attributes)
"<#{open_tag} />"
end
def to_xml({tag, content}), do: to_xml({tag, %{}, content})
def to_xml(content) when is_binary(content) do
to_string(content)
end
def to_xml(content) when is_list(content) do
for element <- content do
to_xml(element)
end
|> Enum.join()
end
def to_xml(%NaiveDateTime{} = time) do
NaiveDateTime.to_iso8601(time)
end
def to_doc(content), do: ~s(<?xml version="1.0" encoding="UTF-8"?>) <> to_xml(content)
defp make_open_tag(tag, attributes) do
attributes_string =
for {attribute, value} <- attributes do
"#{attribute}=\"#{value}\""
end
|> Enum.join(" ")
[tag, attributes_string] |> Enum.join(" ") |> String.trim()
end
end