Fix for quote post image handling bug

This commit is contained in:
Linus2punkt0
2024-01-30 09:20:55 +01:00
parent 3abc9972e6
commit a73da84be9
9 changed files with 913 additions and 918 deletions

View File

@@ -1,235 +1,235 @@
from atproto import Client
from settings.auth import BSKY_HANDLE, BSKY_PASSWORD
from settings.paths import *
from settings import settings
from local.functions import write_log, lang_toggle, get_post_time_limit
import urllib.request, random, string, arrow
date_in_format = 'YYYY-MM-DDTHH:mm:ss'
# Setting up connections to bluesky, twitter and mastodon
bsky = Client()
bsky.login(BSKY_HANDLE, BSKY_PASSWORD)
# Getting posts from bluesky
def get_posts(timelimit = arrow.utcnow().shift(hours = -1)):
write_log("Gathering posts")
posts = {}
# Getting feed of user
profile_feed = bsky.app.bsky.feed.get_author_feed({'actor': BSKY_HANDLE})
visibility = settings.visibility
for feed_view in profile_feed.feed:
# If the post was not written by the account that posted it, it is a repost and we skip it.
if feed_view.post.author.handle != BSKY_HANDLE:
continue
repost = False
created_at = arrow.get(feed_view.post.record.created_at.split(".")[0], date_in_format)
if hasattr(feed_view.reason, "indexed_at"):
repost = True
created_at = arrow.get(feed_view.reason.indexed_at.split(".")[0], date_in_format)
# The language settings on posts are used to determine if a post should be crossposted
# to a specific service. Here we check the settings against the language of the post to
# see what service it should post to. We also check if posting for a service is enabled
# at all in the settings. If it shouldn't post to either, we skip it.
langs = feed_view.post.record.langs
mastodon_post = (lang_toggle(langs, "mastodon") and settings.Mastodon)
twitter_post = (lang_toggle(langs, "twitter") and settings.Twitter)
if not mastodon_post and not twitter_post:
continue
# If post has an embed of type record it is a quote post, and should not be crossposted
cid = feed_view.post.cid
text = feed_view.post.record.text
# Facets contains things like urls and mentions, which we need to deal with.
# send_mention is used to keep track of if the mention-settings says for the post to be posted or not.
# Default is True, because if nobody is mentioned it should be posted.
send_mention = True
if feed_view.post.record.facets:
# Sometimes bluesky shortens URLs and in that case we need to restore them before crossposting
text = restore_urls(feed_view.post.record)
# If a user is mentioned the parse_mentioned_username function will deal with it according
# to how the variable "mentions" is set in settings. If it is set to "ignore", nothing is
# done.
if settings.mentions != "ignore":
text, send_mention = parse_mentioned_username(feed_view.post.record, text)
# If "mentions" is set to "skip" a post with a mention should not be crossposted, and parse_mentioned_username will
# return send_mention as False.
if not send_mention:
continue
# Setting reply_to_user to the same as user handle and only changing it if the tweet is an actual reply.
# This way we can just check if the variable is the same as the user handle later and send through
# both tweets that are not replies, and posts that are part of a thread.
reply_to_user = BSKY_HANDLE
reply_to_post = ""
quoted_post = ""
quote_url = ""
# Checking who is allowed to reply to the post
allowed_reply = get_allowed_reply(feed_view.post)
# Checking if post is a quote post. Posts with references to feeds look like quote posts but aren't, and so will fail on missing attribute.
# Since quote posts can give values in two different ways it's a bit of a hassle to double check if it is an actual quote post,
# so instead I just try to run the function and if it fails I skip the post
# If there is some reason you would want to crosspost a post referencing a bluesky-feed that I'm not seeing, I might update this in the future.
if feed_view.post.embed and hasattr(feed_view.post.embed, "record"):
try:
quoted_user, quoted_post, quote_url, open = get_quote_post(feed_view.post.embed.record)
except:
write_log("Post " + cid + " is of a type the crossposter can't parse.", "error")
continue
# If post is a quote post of a post from another user, and quote-posting is disabled in settings
# or the post is not open to users not logged in, the post will be skipped
if quoted_user != BSKY_HANDLE and (not settings.quote_posts or not open):
continue
# If the post is a quote of ourselves, the url to the post is removed (if it was included),
# as we instead want to reference the version of the post from twitter or mastodon.
# If no such post exists, we can add back the link to the bluesky-post later
elif quoted_user == BSKY_HANDLE:
text = text.replace(quote_url, "")
# Checking if post is regular reply
if feed_view.post.record.reply:
reply_to_post = feed_view.post.record.reply.parent.cid
# Poster will try to fetch reply to-username the "ordinary" way,
# and if it fails, it will try getting the entire thread and
# finding it that way
try:
reply_to_user = feed_view.reply.parent.author.handle
except:
reply_to_user = get_reply_to_user(feed_view.post.record.reply.parent)
# If unable to fetch user that was replied to, code will skip this post. If the post was not a
# reply at all, the reply_to_user will still be set to the user account.
if not reply_to_user:
write_log("Unable to find the user that post " + cid + " replies to or quotes", "error")
continue
# Checking if post is withing timelimit and not a reply to someone elses post.
if created_at > timelimit and reply_to_user == BSKY_HANDLE:
# Fetching images if there are any in the post
image_data = ""
images = []
if feed_view.post.embed and hasattr(feed_view.post.embed, "images"):
image_data = feed_view.post.embed.images
elif feed_view.post.embed and hasattr(feed_view.post.embed, "media") and hasattr(feed_view.post.embed, "record"):
image_data = feed_view.post.embed.media.images
# Sometimes posts have included links that are not included in the actual text of the post. This adds adds that back.
if feed_view.post.embed and hasattr(feed_view.post.embed, "external") and hasattr(feed_view.post.embed.external, "uri"):
if feed_view.post.embed.external.uri not in text:
text += '\n'+feed_view.post.embed.external.uri
if image_data:
for image in image_data:
images.append({"url": image.fullsize, "alt": image.alt})
if visibility == "hybrid" and reply_to_post:
visibility = "unlisted"
elif visibility == "hybrid":
visibility = "public"
post_info = {
"text": text,
"reply_to_post": reply_to_post,
"quoted_post": quoted_post,
"quote_url": quote_url,
"images": images,
"visibility": visibility,
"twitter": twitter_post,
"mastodon": mastodon_post,
"allowed_reply": allowed_reply,
"repost": repost,
"timestamp": created_at
}
# Saving post to posts dictionary
posts[cid] = post_info;
return posts
# Function for getting username of person replied to. It can mostly be retrieved from the reply section of the tweet that has been fetched,
# but in cases where the original post in a thread has been deleted it causes some weirdness. Hopefully this resolves it.
def get_reply_to_user(reply):
uri = reply.uri
username = ""
try:
response = bsky.app.bsky.feed.get_post_thread(params={"uri": uri})
username = response.thread.post.author.handle
except:
write_log("Unable to retrieve reply_to-user of post.", "error")
return username
def get_allowed_reply(post):
reply_restriction = post.threadgate
if reply_restriction is None:
return "All"
if len(reply_restriction.record.allow) == 0:
return "None"
if reply_restriction.record.allow[0].py_type == "app.bsky.feed.threadgate#followingRule":
return "Following"
if reply_restriction.record.allow[0].py_type == "app.bsky.feed.threadgate#mentionRule":
return "Mentioned"
return "Unknown"
# Function for restoring shortened URLS
def restore_urls(record):
text = record.text
encoded_text = text.encode("UTF-8")
for facet in record.facets:
if facet.features[0].py_type != "app.bsky.richtext.facet#link":
continue
url = facet.features[0].uri
# The index section designates where a URL starts end ends. Using this we can pick out the exact
# string representing the URL in the post, and replace it with the actual URL.
start = facet.index.byte_start
end = facet.index.byte_end
section = encoded_text[start:end]
shortened = section.decode("UTF-8")
text = text.replace(shortened, url)
return text
def parse_mentioned_username(record, text):
# send_mention keeps track if the post should be sent at all.
send_mention = True
encoded_text = text.encode("UTF-8")
for facet in record.facets:
if facet.features[0].py_type != "app.bsky.richtext.facet#mention":
continue
# The index section designates where a username starts end ends. Using this we can pick out the exact
# string representing the user in the post, and replace it with the corrected value
start = facet.index.byte_start
end = facet.index.byte_end
username = encoded_text[start:end]
username = username.decode("UTF-8")
# If the mentions setting is set to skip, None will be returned, if it's set to strip the
# text will be returned with the @ of the username removed, if it's set to URL the name will
# be replaced with a link to the profile.
if settings.mentions == "skip":
send_mention = False
elif settings.mentions == "strip":
text = text.replace(username, username.replace("@", ""))
elif settings.mentions == "url":
base_url = "https://bsky.app/profile/"
did = facet.features[0].did
url = base_url + did
text = text.replace(username, url)
return text, send_mention
# Quoted posts can be stored in several different ways for some reason. With this
# function we check which one is used and fetches information accordingly.
def get_quote_post(post):
open = True
if isinstance(post, dict):
user = post["record"]["author"]["handle"]
cid = post["record"]["cid"]
uri = post["record"]["uri"]
labels = post["record"]["author"]["labels"]
elif hasattr(post, "author"):
user = post.author.handle
cid = post.cid
uri = post.uri
labels = post.author.labels
else:
user = post.record.author.handle
cid = post.record.cid
uri = post.record.uri
labels = post.record.author.labels
# the val label is used by bluesky to check if a post should be viewable by people
# who are not logged in. When crossposting with a link to a bsky post, we first
# want to make sure that the post in question is publicly available.
if labels and labels[0].val == "!no-unauthenticated":
open = False
url = "https://bsky.app/profile/" + user + "/post/" + uri.split("/")[-1]
return user, cid, url, open
from atproto import Client
from settings.auth import BSKY_HANDLE, BSKY_PASSWORD
from settings.paths import *
from settings import settings
from local.functions import write_log, lang_toggle
import arrow
date_in_format = 'YYYY-MM-DDTHH:mm:ss'
# Setting up connections to bluesky, twitter and mastodon
bsky = Client()
bsky.login(BSKY_HANDLE, BSKY_PASSWORD)
# Getting posts from bluesky
def get_posts(timelimit = arrow.utcnow().shift(hours = -1)):
write_log("Gathering posts")
posts = {}
# Getting feed of user
profile_feed = bsky.app.bsky.feed.get_author_feed({'actor': BSKY_HANDLE})
visibility = settings.visibility
for feed_view in profile_feed.feed:
# If the post was not written by the account that posted it, it is a repost and we skip it.
if feed_view.post.author.handle != BSKY_HANDLE:
continue
repost = False
created_at = arrow.get(feed_view.post.record.created_at.split(".")[0], date_in_format)
if hasattr(feed_view.reason, "indexed_at"):
repost = True
created_at = arrow.get(feed_view.reason.indexed_at.split(".")[0], date_in_format)
# The language settings on posts are used to determine if a post should be crossposted
# to a specific service. Here we check the settings against the language of the post to
# see what service it should post to. We also check if posting for a service is enabled
# at all in the settings. If it shouldn't post to either, we skip it.
langs = feed_view.post.record.langs
mastodon_post = (lang_toggle(langs, "mastodon") and settings.Mastodon)
twitter_post = (lang_toggle(langs, "twitter") and settings.Twitter)
if not mastodon_post and not twitter_post:
continue
# If post has an embed of type record it is a quote post, and should not be crossposted
cid = feed_view.post.cid
text = feed_view.post.record.text
# Facets contains things like urls and mentions, which we need to deal with.
# send_mention is used to keep track of if the mention-settings says for the post to be posted or not.
# Default is True, because if nobody is mentioned it should be posted.
send_mention = True
if feed_view.post.record.facets:
# Sometimes bluesky shortens URLs and in that case we need to restore them before crossposting
text = restore_urls(feed_view.post.record)
# If a user is mentioned the parse_mentioned_username function will deal with it according
# to how the variable "mentions" is set in settings. If it is set to "ignore", nothing is
# done.
if settings.mentions != "ignore":
text, send_mention = parse_mentioned_username(feed_view.post.record, text)
# If "mentions" is set to "skip" a post with a mention should not be crossposted, and parse_mentioned_username will
# return send_mention as False.
if not send_mention:
continue
# Setting reply_to_user to the same as user handle and only changing it if the tweet is an actual reply.
# This way we can just check if the variable is the same as the user handle later and send through
# both tweets that are not replies, and posts that are part of a thread.
reply_to_user = BSKY_HANDLE
reply_to_post = ""
quoted_post = ""
quote_url = ""
# Checking who is allowed to reply to the post
allowed_reply = get_allowed_reply(feed_view.post)
# Checking if post is a quote post. Posts with references to feeds look like quote posts but aren't, and so will fail on missing attribute.
# Since quote posts can give values in two different ways it's a bit of a hassle to double check if it is an actual quote post,
# so instead I just try to run the function and if it fails I skip the post
# If there is some reason you would want to crosspost a post referencing a bluesky-feed that I'm not seeing, I might update this in the future.
if feed_view.post.embed and hasattr(feed_view.post.embed, "record"):
try:
quoted_user, quoted_post, quote_url, open = get_quote_post(feed_view.post.embed.record)
except:
write_log("Post " + cid + " is of a type the crossposter can't parse.", "error")
continue
# If post is a quote post of a post from another user, and quote-posting is disabled in settings
# or the post is not open to users not logged in, the post will be skipped
if quoted_user != BSKY_HANDLE and (not settings.quote_posts or not open):
continue
# If the post is a quote of ourselves, the url to the post is removed (if it was included),
# as we instead want to reference the version of the post from twitter or mastodon.
# If no such post exists, we can add back the link to the bluesky-post later
elif quoted_user == BSKY_HANDLE:
text = text.replace(quote_url, "")
# Checking if post is regular reply
if feed_view.post.record.reply:
reply_to_post = feed_view.post.record.reply.parent.cid
# Poster will try to fetch reply to-username the "ordinary" way,
# and if it fails, it will try getting the entire thread and
# finding it that way
try:
reply_to_user = feed_view.reply.parent.author.handle
except:
reply_to_user = get_reply_to_user(feed_view.post.record.reply.parent)
# If unable to fetch user that was replied to, code will skip this post. If the post was not a
# reply at all, the reply_to_user will still be set to the user account.
if not reply_to_user:
write_log("Unable to find the user that post " + cid + " replies to or quotes", "error")
continue
# Checking if post is withing timelimit and not a reply to someone elses post.
if created_at > timelimit and reply_to_user == BSKY_HANDLE:
# Fetching images if there are any in the post
image_data = ""
images = []
if feed_view.post.embed and hasattr(feed_view.post.embed, "images"):
image_data = feed_view.post.embed.images
elif feed_view.post.embed and hasattr(feed_view.post.embed, "media") and hasattr(feed_view.post.embed.media, "images"):
image_data = feed_view.post.embed.media.images
# Sometimes posts have included links that are not included in the actual text of the post. This adds adds that back.
if feed_view.post.embed and hasattr(feed_view.post.embed, "external") and hasattr(feed_view.post.embed.external, "uri"):
if feed_view.post.embed.external.uri not in text:
text += '\n'+feed_view.post.embed.external.uri
if image_data:
for image in image_data:
images.append({"url": image.fullsize, "alt": image.alt})
if visibility == "hybrid" and reply_to_post:
visibility = "unlisted"
elif visibility == "hybrid":
visibility = "public"
post_info = {
"text": text,
"reply_to_post": reply_to_post,
"quoted_post": quoted_post,
"quote_url": quote_url,
"images": images,
"visibility": visibility,
"twitter": twitter_post,
"mastodon": mastodon_post,
"allowed_reply": allowed_reply,
"repost": repost,
"timestamp": created_at
}
# Saving post to posts dictionary
posts[cid] = post_info;
return posts
# Function for getting username of person replied to. It can mostly be retrieved from the reply section of the tweet that has been fetched,
# but in cases where the original post in a thread has been deleted it causes some weirdness. Hopefully this resolves it.
def get_reply_to_user(reply):
uri = reply.uri
username = ""
try:
response = bsky.app.bsky.feed.get_post_thread(params={"uri": uri})
username = response.thread.post.author.handle
except:
write_log("Unable to retrieve reply_to-user of post.", "error")
return username
def get_allowed_reply(post):
reply_restriction = post.threadgate
if reply_restriction is None:
return "All"
if len(reply_restriction.record.allow) == 0:
return "None"
if reply_restriction.record.allow[0].py_type == "app.bsky.feed.threadgate#followingRule":
return "Following"
if reply_restriction.record.allow[0].py_type == "app.bsky.feed.threadgate#mentionRule":
return "Mentioned"
return "Unknown"
# Function for restoring shortened URLS
def restore_urls(record):
text = record.text
encoded_text = text.encode("UTF-8")
for facet in record.facets:
if facet.features[0].py_type != "app.bsky.richtext.facet#link":
continue
url = facet.features[0].uri
# The index section designates where a URL starts end ends. Using this we can pick out the exact
# string representing the URL in the post, and replace it with the actual URL.
start = facet.index.byte_start
end = facet.index.byte_end
section = encoded_text[start:end]
shortened = section.decode("UTF-8")
text = text.replace(shortened, url)
return text
def parse_mentioned_username(record, text):
# send_mention keeps track if the post should be sent at all.
send_mention = True
encoded_text = text.encode("UTF-8")
for facet in record.facets:
if facet.features[0].py_type != "app.bsky.richtext.facet#mention":
continue
# The index section designates where a username starts end ends. Using this we can pick out the exact
# string representing the user in the post, and replace it with the corrected value
start = facet.index.byte_start
end = facet.index.byte_end
username = encoded_text[start:end]
username = username.decode("UTF-8")
# If the mentions setting is set to skip, None will be returned, if it's set to strip the
# text will be returned with the @ of the username removed, if it's set to URL the name will
# be replaced with a link to the profile.
if settings.mentions == "skip":
send_mention = False
elif settings.mentions == "strip":
text = text.replace(username, username.replace("@", ""))
elif settings.mentions == "url":
base_url = "https://bsky.app/profile/"
did = facet.features[0].did
url = base_url + did
text = text.replace(username, url)
return text, send_mention
# Quoted posts can be stored in several different ways for some reason. With this
# function we check which one is used and fetches information accordingly.
def get_quote_post(post):
open = True
if isinstance(post, dict):
user = post["record"]["author"]["handle"]
cid = post["record"]["cid"]
uri = post["record"]["uri"]
labels = post["record"]["author"]["labels"]
elif hasattr(post, "author"):
user = post.author.handle
cid = post.cid
uri = post.uri
labels = post.author.labels
else:
user = post.record.author.handle
cid = post.record.cid
uri = post.record.uri
labels = post.record.author.labels
# the val label is used by bluesky to check if a post should be viewable by people
# who are not logged in. When crossposting with a link to a bsky post, we first
# want to make sure that the post in question is publicly available.
if labels and labels[0].val == "!no-unauthenticated":
open = False
url = "https://bsky.app/profile/" + user + "/post/" + uri.split("/")[-1]
return user, cid, url, open

View File

@@ -1,131 +1,131 @@
from settings.paths import *
from local.functions import write_log
import json, os, shutil, arrow
# Function for writing new lines to the database
def db_write(skeet, tweet, toot, failed, database):
ids = {
"twitter_id": tweet,
"mastodon_id": toot
}
data = {
"ids": ids,
"failed": failed
}
# When running, the code saves the database to memory, so instead of just saving the post to the database file,
# we also save it to the open database. This also overwrites the version of the post in memory in case
# an ID that was missing because of a previous failure.
database[skeet] = data
row = {
"skeet": skeet,
"ids": ids,
"failed": failed
}
json_string = json.dumps(row)
# If the database file exists we want to append to it, otherwise we create it anew.
if os.path.exists(database_path):
append_write = 'a'
else:
append_write = 'w'
# Skipping adding posts to db file if they are already in it.
if not is_in_db(json_string):
write_log("Adding to database: " + json_string)
file = open(database_path, append_write)
file.write(json_string + "\n")
file.close()
return database
# Function for reading database file and saving values in a dictionary
def db_read():
database = {}
if not os.path.exists(database_path):
return database
with open(database_path, 'r') as file:
for line in file:
try:
json_line = json.loads(line)
except:
continue
skeet = json_line["skeet"]
ids = json_line["ids"]
ids = db_convert(ids)
failed = {"twitter": 0, "mastodon": 0}
if "failed" in json_line:
failed = json_line["failed"]
line_data = {
"ids": ids,
"failed": failed
}
database[skeet] = line_data
return database;
# After changing from camelCase to snake_case, old database entries will have to be converted.
def db_convert(ids_in):
ids_out = {}
try:
ids_out["twitter_id"] = ids_in["twitter_id"]
except:
ids_out["twitter_id"] = ids_in["twitterId"]
try:
ids_out["mastodon_id"] = ids_in["mastodon_id"]
except:
ids_out["mastodon_id"] = ids_in["mastodonId"]
return ids_out
# Function for checking if a line is already in the database-file
def is_in_db(line):
if not os.path.exists(database_path):
return False
with open(database_path, 'r') as file:
content = file.read()
if line in content:
return True
else:
return False
# Since we are working with a version of the database in memory, at the end of the run
# we completely overwrite the database on file with the one in memory.
# This does kind of make it uneccessary to write each new post to the file while running,
# but in case the program fails halfway through it gives us kind of a backup.
def save_db(database):
write_log("Saving new database")
append_write = "w"
for skeet in database:
row = {
"skeet": skeet,
"ids": database[skeet]["ids"],
"failed": database[skeet]["failed"]
}
jsonString = json.dumps(row)
file = open(database_path, append_write)
file.write(jsonString + "\n")
file.close()
append_write = "a"
# Every twelve hours a backup of the database is saved, in case something happens to the live database.
# If the live database contains fewer lines than the backup it means something has probably gone wrong,
# and before the live database is saved as a backup, the current backup is saved as a new file, so that
# it can be recovered later.
def db_backup():
if not os.path.isfile(database_path) or (os.path.isfile(backup_path)
and arrow.Arrow.fromtimestamp(os.stat(backup_path).st_mtime) > arrow.utcnow().shift(hours = -24)):
return
if os.path.isfile(backup_path):
if count_lines(backup_path) < count_lines(database_path):
os.remove(backup_path)
else:
date = arrow.utcnow().format("YYMMDD")
os.rename(backup_path, backup_path + "_" + date)
write_log("Current backup file contains more entries than current live database, backup saved", "error")
shutil.copyfile(database_path, backup_path)
write_log("Backup of database taken")
# Function for counting lines in a file
def count_lines(file):
count = 0;
with open(file, 'r') as file:
for count, line in enumerate(file):
pass
from settings.paths import *
from local.functions import write_log
import json, os, shutil, arrow
# Function for writing new lines to the database
def db_write(skeet, tweet, toot, failed, database):
ids = {
"twitter_id": tweet,
"mastodon_id": toot
}
data = {
"ids": ids,
"failed": failed
}
# When running, the code saves the database to memory, so instead of just saving the post to the database file,
# we also save it to the open database. This also overwrites the version of the post in memory in case
# an ID that was missing because of a previous failure.
database[skeet] = data
row = {
"skeet": skeet,
"ids": ids,
"failed": failed
}
json_string = json.dumps(row)
# If the database file exists we want to append to it, otherwise we create it anew.
if os.path.exists(database_path):
append_write = 'a'
else:
append_write = 'w'
# Skipping adding posts to db file if they are already in it.
if not is_in_db(json_string):
write_log("Adding to database: " + json_string)
file = open(database_path, append_write)
file.write(json_string + "\n")
file.close()
return database
# Function for reading database file and saving values in a dictionary
def db_read():
database = {}
if not os.path.exists(database_path):
return database
with open(database_path, 'r') as file:
for line in file:
try:
json_line = json.loads(line)
except:
continue
skeet = json_line["skeet"]
ids = json_line["ids"]
ids = db_convert(ids)
failed = {"twitter": 0, "mastodon": 0}
if "failed" in json_line:
failed = json_line["failed"]
line_data = {
"ids": ids,
"failed": failed
}
database[skeet] = line_data
return database;
# After changing from camelCase to snake_case, old database entries will have to be converted.
def db_convert(ids_in):
ids_out = {}
try:
ids_out["twitter_id"] = ids_in["twitter_id"]
except:
ids_out["twitter_id"] = ids_in["twitterId"]
try:
ids_out["mastodon_id"] = ids_in["mastodon_id"]
except:
ids_out["mastodon_id"] = ids_in["mastodonId"]
return ids_out
# Function for checking if a line is already in the database-file
def is_in_db(line):
if not os.path.exists(database_path):
return False
with open(database_path, 'r') as file:
content = file.read()
if line in content:
return True
else:
return False
# Since we are working with a version of the database in memory, at the end of the run
# we completely overwrite the database on file with the one in memory.
# This does kind of make it uneccessary to write each new post to the file while running,
# but in case the program fails halfway through it gives us kind of a backup.
def save_db(database):
write_log("Saving new database")
append_write = "w"
for skeet in database:
row = {
"skeet": skeet,
"ids": database[skeet]["ids"],
"failed": database[skeet]["failed"]
}
jsonString = json.dumps(row)
file = open(database_path, append_write)
file.write(jsonString + "\n")
file.close()
append_write = "a"
# Every twelve hours a backup of the database is saved, in case something happens to the live database.
# If the live database contains fewer lines than the backup it means something has probably gone wrong,
# and before the live database is saved as a backup, the current backup is saved as a new file, so that
# it can be recovered later.
def db_backup():
if not os.path.isfile(database_path) or (os.path.isfile(backup_path)
and arrow.Arrow.fromtimestamp(os.stat(backup_path).st_mtime) > arrow.utcnow().shift(hours = -24)):
return
if os.path.isfile(backup_path):
if count_lines(backup_path) < count_lines(database_path):
os.remove(backup_path)
else:
date = arrow.utcnow().format("YYMMDD")
os.rename(backup_path, backup_path + "_" + date)
write_log("Current backup file contains more entries than current live database, backup saved", "error")
shutil.copyfile(database_path, backup_path)
write_log("Backup of database taken")
# Function for counting lines in a file
def count_lines(file):
count = 0;
with open(file, 'r') as file:
for count, line in enumerate(file):
pass
return count

View File

@@ -1,114 +1,116 @@
from settings.auth import *
from settings.paths import *
from local.functions import *
import settings.settings as settings
import os, shutil, re, arrow
# This function uses the language selection as a way to select which posts should be crossposted.
def lang_toggle(langs, service):
if service == "twitter":
lang_toggle = settings.twitter_lang
elif service == "mastodon":
lang_toggle = settings.mastodon_lang
else:
write_log("Something has gone very wrong.", "error")
exit()
if not lang_toggle:
return True
if langs and lang_toggle in langs:
return (not settings.post_default)
else:
return settings.post_default
# Function for correctly counting post length
def post_length(post):
# Twitter shortens urls to 23 characters
short_url_length = 23
length = len(post)
# Finding all urls and calculating how much shorter the post will be after shortening
regex = r"(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'\".,<>?«»“”‘’]))"
urls = re.findall(regex, post)
for url in urls:
url_length = len(url[0])
if url_length > short_url_length:
length = length - (url_length - short_url_length)
return length
# Function for writing to the log file
def write_log(message, type = "message"):
if settings.log_level == "none" or (settings.log_level == "error" and type == "message"):
return;
now = arrow.utcnow().format("DD/MM/YYYY HH:mm:ss")
date = arrow.utcnow().format("YYMMDD")
message = str(now) + " (" + type.upper() + "): " + str(message) + "\n"
print(message)
log = log_path + date + ".log"
if os.path.exists(log):
append_write = 'a'
else:
append_write = 'w'
dst = open(log, append_write)
dst.write(message)
dst.close()
# Cleaning up downloaded images
def cleanup():
write_log("Deleting local images")
for filename in os.listdir(image_path):
file_path = os.path.join(image_path, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
write_log('Failed to delete %s. Reason: %s' % (file_path, e), "error")
# Following two functions deals with the post per hour limit
# Function for reading post log and checking number of posts sent in last hour
def post_cache_read():
write_log("Reading cache of recent posts.")
cache = {}
timelimit = arrow.utcnow().shift(hours = -1)
if not os.path.exists(post_cache_path):
write_log(post_cache_path + " not found.")
return cache
with open(post_cache_path, 'r') as file:
for line in file:
try:
post_id = line.split(";")[0]
timestamp = int(line.split(".")[1])
timestamp = arrow.Arrow.fromtimestamp(timestamp)
except Exception as error:
write_log(error, "error")
continue
if timestamp > timelimit:
cache[post_id] = timestamp
return cache;
def post_cache_write(cache):
write_log("Saving post cache.")
append_write = "w"
for post_id in cache:
timestamp = str(cache[post_id].timestamp())
file = open(post_cache_path, append_write)
file.write(post_id + ";" + timestamp + "\n")
file.close()
append_write = "a"
# The timelimit specifies the cutoff time for which posts are crossposted. This is usually based on the
# post_time_limit in settings, but if overflow_posts is set to "skip", meaning any posts that could
# not be posted due to the hourly post max limit is to be skipped, then the timelimit is instead set to
# when the last post was sent.
def get_post_time_limit(cache):
timelimit = arrow.utcnow().shift(hours = -settings.post_time_limit)
if settings.overflow_posts != "skip":
return timelimit
for post_id in cache:
if timelimit < cache[post_id]:
timelimit = cache[post_id]
return timelimit
from settings.auth import *
from settings.paths import *
from local.functions import *
import settings.settings as settings
import os, shutil, re, arrow
# This function uses the language selection as a way to select which posts should be crossposted.
def lang_toggle(langs, service):
if service == "twitter":
lang_toggle = settings.twitter_lang
elif service == "mastodon":
lang_toggle = settings.mastodon_lang
else:
write_log("Something has gone very wrong.", "error")
exit()
if not lang_toggle:
return True
if langs and lang_toggle in langs:
return (not settings.post_default)
else:
return settings.post_default
# Function for correctly counting post length
def post_length(post):
# Twitter shortens urls to 23 characters
short_url_length = 23
length = len(post)
# Finding all urls and calculating how much shorter the post will be after shortening
regex = r"(?i)\b((?:https?://|www\d{0,3}[.]|[a-z0-9.\-]+[.][a-z]{2,4}/)(?:[^\s()<>]+|\(([^\s()<>]+|(\([^\s()<>]+\)))*\))+(?:\(([^\s()<>]+|(\([^\s()<>]+\)))*\)|[^\s`!()\[\]{};:'\".,<>?«»“”‘’]))"
urls = re.findall(regex, post)
for url in urls:
url_length = len(url[0])
if url_length > short_url_length:
length = length - (url_length - short_url_length)
return length
# Function for writing to the log file
def write_log(message, type = "message"):
if settings.log_level == "none" or (settings.log_level == "error" and type == "message"):
return;
now = arrow.utcnow().format("DD/MM/YYYY HH:mm:ss")
date = arrow.utcnow().format("YYMMDD")
message = str(now) + " (" + type.upper() + "): " + str(message) + "\n"
print(message)
log = log_path + date + ".log"
if os.path.exists(log):
append_write = 'a'
else:
append_write = 'w'
dst = open(log, append_write)
dst.write(message)
dst.close()
# Cleaning up downloaded images
def cleanup():
write_log("Deleting local images")
for filename in os.listdir(image_path):
if (filename == ".gitignore"):
continue
file_path = os.path.join(image_path, filename)
try:
if os.path.isfile(file_path) or os.path.islink(file_path):
os.unlink(file_path)
elif os.path.isdir(file_path):
shutil.rmtree(file_path)
except Exception as e:
write_log('Failed to delete %s. Reason: %s' % (file_path, e), "error")
# Following two functions deals with the post per hour limit
# Function for reading post log and checking number of posts sent in last hour
def post_cache_read():
write_log("Reading cache of recent posts.")
cache = {}
timelimit = arrow.utcnow().shift(hours = -1)
if not os.path.exists(post_cache_path):
write_log(post_cache_path + " not found.")
return cache
with open(post_cache_path, 'r') as file:
for line in file:
try:
post_id = line.split(";")[0]
timestamp = int(line.split(".")[1])
timestamp = arrow.Arrow.fromtimestamp(timestamp)
except Exception as error:
write_log(error, "error")
continue
if timestamp > timelimit:
cache[post_id] = timestamp
return cache;
def post_cache_write(cache):
write_log("Saving post cache.")
append_write = "w"
for post_id in cache:
timestamp = str(cache[post_id].timestamp())
file = open(post_cache_path, append_write)
file.write(post_id + ";" + timestamp + "\n")
file.close()
append_write = "a"
# The timelimit specifies the cutoff time for which posts are crossposted. This is usually based on the
# post_time_limit in settings, but if overflow_posts is set to "skip", meaning any posts that could
# not be posted due to the hourly post max limit is to be skipped, then the timelimit is instead set to
# when the last post was sent.
def get_post_time_limit(cache):
timelimit = arrow.utcnow().shift(hours = -settings.post_time_limit)
if settings.overflow_posts != "skip":
return timelimit
for post_id in cache:
if timelimit < cache[post_id]:
timelimit = cache[post_id]
return timelimit

View File

@@ -1,46 +1,46 @@
from mastodon import Mastodon
from settings import settings
from settings.auth import *
from local.functions import write_log
if settings.Mastodon:
mastodon = Mastodon(
access_token = MASTODON_TOKEN,
api_base_url = MASTODON_INSTANCE
)
# More or less the exact same function as for tweeting, but for tooting.
def toot(post, reply_to_post, quoted_post, images, visibility = "unlisted"):
# Since mastodon does not have a quote repost function, quote posts are turned into replies. If the post is both
# a reply and a quote post, the quote is replaced with a url to the post quoted.
if reply_to_post is None and quoted_post:
reply_to_post = quoted_post
elif reply_to_post is not None and quoted_post:
post_url = MASTODON_INSTANCE + "@" + MASTODON_USER + "/" + str(quoted_post)
post += "\n" + post_url
media_ids = []
# If post includes images, images are uploaded so that they can be included in the toot
if images:
for image in images:
filename = image["filename"]
alt = image["alt"]
# If alt text was added to the image on bluesky, it's also added to the image on mastodon,
# otherwise it will be uploaded without alt text.
if alt:
write_log("Uploading image " + filename + " with alt: " + alt + " to mastodon")
res = mastodon.media_post(filename, description=alt)
else:
write_log("Uploading image " + filename)
res = mastodon.media_post(filename)
media_ids.append(res.id)
# I wanted to make this part a little neater, but didn't get it to work and gave up. So here we are.
# If post is both reply and has images it is posted as both a reply and with images (duh).
# If just either of the two it is posted with just that, and if neither it is just posted as a text post.
a = mastodon.status_post(post, in_reply_to_id=reply_to_post, media_ids=media_ids, visibility=visibility)
write_log("Posted to mastodon")
id = a["id"]
return id
def retoot(toot_id):
mastodon.status_reblog(toot_id)
from mastodon import Mastodon
from settings import settings
from settings.auth import *
from local.functions import write_log
if settings.Mastodon:
mastodon = Mastodon(
access_token = MASTODON_TOKEN,
api_base_url = MASTODON_INSTANCE
)
# More or less the exact same function as for tweeting, but for tooting.
def toot(post, reply_to_post, quoted_post, images, visibility = "unlisted"):
# Since mastodon does not have a quote repost function, quote posts are turned into replies. If the post is both
# a reply and a quote post, the quote is replaced with a url to the post quoted.
if reply_to_post is None and quoted_post:
reply_to_post = quoted_post
elif reply_to_post is not None and quoted_post:
post_url = MASTODON_INSTANCE + "@" + MASTODON_USER + "/" + str(quoted_post)
post += "\n" + post_url
media_ids = []
# If post includes images, images are uploaded so that they can be included in the toot
if images:
for image in images:
filename = image["filename"]
alt = image["alt"]
# If alt text was added to the image on bluesky, it's also added to the image on mastodon,
# otherwise it will be uploaded without alt text.
if alt:
write_log("Uploading image " + filename + " with alt: " + alt + " to mastodon")
res = mastodon.media_post(filename, description=alt)
else:
write_log("Uploading image " + filename)
res = mastodon.media_post(filename)
media_ids.append(res.id)
# I wanted to make this part a little neater, but didn't get it to work and gave up. So here we are.
# If post is both reply and has images it is posted as both a reply and with images (duh).
# If just either of the two it is posted with just that, and if neither it is just posted as a text post.
a = mastodon.status_post(post, in_reply_to_id=reply_to_post, media_ids=media_ids, visibility=visibility)
write_log("Posted to mastodon")
id = a["id"]
return id
def retoot(toot_id):
mastodon.status_reblog(toot_id)
write_log("Boosted toot " + str(toot_id))

View File

@@ -1,179 +1,179 @@
import random, string, urllib, arrow
from settings import settings
from settings.paths import *
from local.functions import write_log
from local.db import db_write
from output.twitter import tweet, retweet
from output.mastodon import toot, retoot
def post(posts, database, post_cache):
# The updates status is set to false until anything has been altered in the databse. If nothing has been posted in a run, we skip resaving the database.
updates = False
# Running through the posts dictionary reversed, to get oldest posts first.
for cid in reversed(list(posts.keys())):
post = posts[cid]
# Checking if a maximum amount of posts per hour is set, and if so if it has been reached.
if settings.max_per_hour != 0 and len(post_cache) >= settings.max_per_hour:
write_log("Max posts per hour reached.")
break
# If a post is posted, we want to add a timestamp to the post_cache. Since there are several
# reasons why a post might not be posted, we start out with this set to false for each post,
# and change it to true if a post is actually sent.
posted = False
# Checking if the post is already in the database, and in that case getting the IDs for the post
# on twitter and mastodon. If one or both of these IDs are empty, post will be sent.
# Also checking the existing fail count against the max_retries set in settings, to avoid
# retrying a failure so much that the poster gets ratelimited
tweet_id = ""
toot_id = ""
t_fail = 0
m_fail = 0
if cid in database:
tweet_id = database[cid]["ids"]["twitter_id"]
toot_id = database[cid]["ids"]["mastodon_id"]
t_fail = database[cid]["failed"]["twitter"]
m_fail = database[cid]["failed"]["mastodon"]
if m_fail >= settings.max_retries:
write_log("Error limit reached, not posting to Mastodon", "error")
if not toot_id:
updates = True
toot_id = "FailedToPost"
if t_fail >= settings.max_retries:
write_log("Error limit reached, not posting to Twitter", "error")
if not tweet_id:
updates = True
tweet_id = "FailedToPost"
text = post["text"]
reply_to_post = post["reply_to_post"]
quoted_post = post["quoted_post"]
quote_url = post["quote_url"]
images = post["images"]
visibility = post["visibility"]
allowed_reply = post["allowed_reply"]
tweet_reply = ""
toot_reply = ""
tweet_quote = ""
toot_quote = ""
# If the post has already been sent to both twitter and mastodon and is not a repost, no
# further action is needed.
if tweet_id and toot_id and not post["repost"]:
continue
# If a retweet is found within the last hour, we check the cache to see if it has already been retweeted
repost_timelimit = arrow.utcnow().shift(hours = -1)
if cid in post_cache:
repost_timelimit = post_cache[cid]
# If it is a reply, we get the IDs of the posts we want to reply to from the database.
# If post is not found in database, we can't continue the thread on mastodon and twitter,
# and so we skip it.
if reply_to_post in database:
tweet_reply = database[reply_to_post]["ids"]["twitter_id"]
toot_reply = database[reply_to_post]["ids"]["mastodon_id"]
elif reply_to_post and reply_to_post not in database:
write_log("Post " + cid + " was a reply to a post that is not in the database.", "error")
continue
# If post is a quote post we get the IDs of the posts we want to quote from the database.
# If the posts are not found in the database we check if the quote_post setting is true or false in settings.
# If true we add the URL of the bluesky post to the text of the post, if false we skip the post.
if quoted_post in database:
tweet_quote = database[quoted_post]["ids"]["twitter_id"]
toot_quote = database[quoted_post]["ids"]["mastodon_id"]
elif quoted_post and quoted_post not in database:
if settings.quote_posts and quote_url not in text:
text += "\n" + quote_url
elif not settings.quote_posts:
write_log("Post " + cid + " was a quote of a post that is not in the database.", "error")
continue
# In case the tweet or toot reply/quote variables are empty, we set them to None, to make sure they are in the correct format for
# the api requests. This is not necessary for the toot_quote variable, as it is not sent as a parameter in itself anyway.
if not tweet_reply:
tweet_reply = None
if not toot_reply:
toot_reply = None
if not tweet_quote:
tweet_quote = None
# If either tweet or toot has not previously been posted, we download images (given the post includes images).
if images and (not tweet_id or not toot_id):
images = get_images(images)
# If mastodon is set to false, the post is not sent to mastodon.
if not post["twitter"]:
toot_id = "skipped"
write_log("Not posting to Twitter because posting was set to false.")
elif tweet_id and not post["repost"]:
write_log("Post " + cid + " already sent to twitter.")
# if the post already exists and is a repost, we check if it has already been reposted, and if not, repost it.
elif tweet_id and post["repost"] and post["timestamp"] > repost_timelimit:
try:
# This is where retweets would go if they weren't locked behind a paywall.
pass
# retweet(tweet_id)
# posted = True
except Exception as error:
write_log(error, "error")
# Trying to post to twitter and mastodon. If posting fails the post ID for each service is set to an
# empty string, letting the code know it should try again next time the code is run.
elif not tweet_id and tweet_reply != "skipped" and tweet_reply != "FailedToPost":
updates = True
try:
tweet_id = tweet(text, tweet_reply, tweet_quote, images, allowed_reply)
posted = True
except Exception as error:
write_log(error, "error")
t_fail += 1
tweet_id = ""
# If a tweet failes as a duplicate post, we don't want to try sending it again.
if "duplicate content" in str(error):
t_fail = settings.max_retries
tweet_id = "duplicate"
else:
write_log("Not posting " + cid + " to Twitter")
# If mastodon is set to false, the post is not sent to mastodon.
if not post["mastodon"]:
toot_id = "skipped"
write_log("Not posting to Mastodon because posting was set to false.")
elif toot_id and not post["repost"]:
write_log("Post " + cid + " already sent to mastodon.")
# if the post already exists and is a repost, we check if it has already been reposted, and if not, repost it.
elif toot_id and post["repost"] and post["timestamp"] > repost_timelimit:
try:
retoot(toot_id)
posted = True
except Exception as error:
write_log(error, "error")
# Mastodon does not have a quote retweet function, so those will just be sent as replies.
elif not toot_id and toot_reply != "skipped" and toot_reply != "FailedToPost":
updates = True
try:
toot_id = toot(text, toot_reply, toot_quote, images, visibility)
posted = True
except Exception as error:
write_log(error, "error")
m_fail += 1
toot_id = ""
else:
write_log("Not posting " + cid + " to Mastodon")
# Saving post to database
database = db_write(cid, tweet_id, toot_id, {"twitter": t_fail, "mastodon": m_fail}, database)
if posted:
post_cache[cid] = arrow.utcnow()
return updates, database, post_cache
# Function for getting included images. If no images are included, an empty list will be returned,
# and the posting functions will know not to include any images.
def get_images(images):
local_images = []
for image in images:
# Getting alt text for image. If there is none this will be an empty string.
alt = image["alt"]
# Giving the image just a random filename
filename = ''.join(random.choice(string.ascii_lowercase) for i in range(10)) + ".jpg"
filename = image_path + filename
# Downloading fullsize version of image
urllib.request.urlretrieve(image["url"], filename)
# Saving image info in a dictionary and adding it to the list.
image_info = {
"filename": filename,
"alt": alt
}
local_images.append(image_info)
import random, string, urllib, arrow
from settings import settings
from settings.paths import *
from local.functions import write_log
from local.db import db_write
from output.twitter import tweet, retweet
from output.mastodon import toot, retoot
def post(posts, database, post_cache):
# The updates status is set to false until anything has been altered in the databse. If nothing has been posted in a run, we skip resaving the database.
updates = False
# Running through the posts dictionary reversed, to get oldest posts first.
for cid in reversed(list(posts.keys())):
post = posts[cid]
# Checking if a maximum amount of posts per hour is set, and if so if it has been reached.
if settings.max_per_hour != 0 and len(post_cache) >= settings.max_per_hour:
write_log("Max posts per hour reached.")
break
# If a post is posted, we want to add a timestamp to the post_cache. Since there are several
# reasons why a post might not be posted, we start out with this set to false for each post,
# and change it to true if a post is actually sent.
posted = False
# Checking if the post is already in the database, and in that case getting the IDs for the post
# on twitter and mastodon. If one or both of these IDs are empty, post will be sent.
# Also checking the existing fail count against the max_retries set in settings, to avoid
# retrying a failure so much that the poster gets ratelimited
tweet_id = ""
toot_id = ""
t_fail = 0
m_fail = 0
if cid in database:
tweet_id = database[cid]["ids"]["twitter_id"]
toot_id = database[cid]["ids"]["mastodon_id"]
t_fail = database[cid]["failed"]["twitter"]
m_fail = database[cid]["failed"]["mastodon"]
if m_fail >= settings.max_retries:
write_log("Error limit reached, not posting to Mastodon", "error")
if not toot_id:
updates = True
toot_id = "FailedToPost"
if t_fail >= settings.max_retries:
write_log("Error limit reached, not posting to Twitter", "error")
if not tweet_id:
updates = True
tweet_id = "FailedToPost"
text = post["text"]
reply_to_post = post["reply_to_post"]
quoted_post = post["quoted_post"]
quote_url = post["quote_url"]
images = post["images"]
visibility = post["visibility"]
allowed_reply = post["allowed_reply"]
tweet_reply = ""
toot_reply = ""
tweet_quote = ""
toot_quote = ""
# If the post has already been sent to both twitter and mastodon and is not a repost, no
# further action is needed.
if tweet_id and toot_id and not post["repost"]:
continue
# If a retweet is found within the last hour, we check the cache to see if it has already been retweeted
repost_timelimit = arrow.utcnow().shift(hours = -1)
if cid in post_cache:
repost_timelimit = post_cache[cid]
# If it is a reply, we get the IDs of the posts we want to reply to from the database.
# If post is not found in database, we can't continue the thread on mastodon and twitter,
# and so we skip it.
if reply_to_post in database:
tweet_reply = database[reply_to_post]["ids"]["twitter_id"]
toot_reply = database[reply_to_post]["ids"]["mastodon_id"]
elif reply_to_post and reply_to_post not in database:
write_log("Post " + cid + " was a reply to a post that is not in the database.", "error")
continue
# If post is a quote post we get the IDs of the posts we want to quote from the database.
# If the posts are not found in the database we check if the quote_post setting is true or false in settings.
# If true we add the URL of the bluesky post to the text of the post, if false we skip the post.
if quoted_post in database:
tweet_quote = database[quoted_post]["ids"]["twitter_id"]
toot_quote = database[quoted_post]["ids"]["mastodon_id"]
elif quoted_post and quoted_post not in database:
if settings.quote_posts and quote_url not in text:
text += "\n" + quote_url
elif not settings.quote_posts:
write_log("Post " + cid + " was a quote of a post that is not in the database.", "error")
continue
# In case the tweet or toot reply/quote variables are empty, we set them to None, to make sure they are in the correct format for
# the api requests. This is not necessary for the toot_quote variable, as it is not sent as a parameter in itself anyway.
if not tweet_reply:
tweet_reply = None
if not toot_reply:
toot_reply = None
if not tweet_quote:
tweet_quote = None
# If either tweet or toot has not previously been posted, we download images (given the post includes images).
if images and (not tweet_id or not toot_id):
images = get_images(images)
# If mastodon is set to false, the post is not sent to mastodon.
if not post["twitter"]:
toot_id = "skipped"
write_log("Not posting to Twitter because posting was set to false.")
elif tweet_id and not post["repost"]:
write_log("Post " + cid + " already sent to twitter.")
# if the post already exists and is a repost, we check if it has already been reposted, and if not, repost it.
elif tweet_id and post["repost"] and post["timestamp"] > repost_timelimit:
try:
# This is where retweets would go if they weren't locked behind a paywall.
pass
# retweet(tweet_id)
# posted = True
except Exception as error:
write_log(error, "error")
# Trying to post to twitter and mastodon. If posting fails the post ID for each service is set to an
# empty string, letting the code know it should try again next time the code is run.
elif not tweet_id and tweet_reply != "skipped" and tweet_reply != "FailedToPost":
updates = True
try:
tweet_id = tweet(text, tweet_reply, tweet_quote, images, allowed_reply)
posted = True
except Exception as error:
write_log(error, "error")
t_fail += 1
tweet_id = ""
# If a tweet failes as a duplicate post, we don't want to try sending it again.
if "duplicate content" in str(error):
t_fail = settings.max_retries
tweet_id = "duplicate"
else:
write_log("Not posting " + cid + " to Twitter")
# If mastodon is set to false, the post is not sent to mastodon.
if not post["mastodon"]:
toot_id = "skipped"
write_log("Not posting to Mastodon because posting was set to false.")
elif toot_id and not post["repost"]:
write_log("Post " + cid + " already sent to mastodon.")
# if the post already exists and is a repost, we check if it has already been reposted, and if not, repost it.
elif toot_id and post["repost"] and post["timestamp"] > repost_timelimit:
try:
retoot(toot_id)
posted = True
except Exception as error:
write_log(error, "error")
# Mastodon does not have a quote retweet function, so those will just be sent as replies.
elif not toot_id and toot_reply != "skipped" and toot_reply != "FailedToPost":
updates = True
try:
toot_id = toot(text, toot_reply, toot_quote, images, visibility)
posted = True
except Exception as error:
write_log(error, "error")
m_fail += 1
toot_id = ""
else:
write_log("Not posting " + cid + " to Mastodon")
# Saving post to database
database = db_write(cid, tweet_id, toot_id, {"twitter": t_fail, "mastodon": m_fail}, database)
if posted:
post_cache[cid] = arrow.utcnow()
return updates, database, post_cache
# Function for getting included images. If no images are included, an empty list will be returned,
# and the posting functions will know not to include any images.
def get_images(images):
local_images = []
for image in images:
# Getting alt text for image. If there is none this will be an empty string.
alt = image["alt"]
# Giving the image just a random filename
filename = ''.join(random.choice(string.ascii_lowercase) for i in range(10)) + ".jpg"
filename = image_path + filename
# Downloading fullsize version of image
urllib.request.urlretrieve(image["url"], filename)
# Saving image info in a dictionary and adding it to the list.
image_info = {
"filename": filename,
"alt": alt
}
local_images.append(image_info)
return local_images

View File

@@ -1,91 +1,91 @@
import tweepy
from settings import settings
from settings.auth import *
from local.functions import write_log
if settings.Twitter:
twitter_client = tweepy.Client(consumer_key=TWITTER_APP_KEY,
consumer_secret=TWITTER_APP_SECRET,
access_token=TWITTER_ACCESS_TOKEN,
access_token_secret=TWITTER_ACCESS_TOKEN_SECRET)
tweepy_auth = tweepy.OAuth1UserHandler(TWITTER_APP_KEY, TWITTER_APP_SECRET, TWITTER_ACCESS_TOKEN, TWITTER_ACCESS_TOKEN_SECRET)
twitter_api = tweepy.API(tweepy_auth)
# Function for posting tweets
def tweet(post, reply_to_post, quoted_post, images, allowed_reply):
media_ids = None
reply_settings = set_reply_settings(allowed_reply)
# If post includes images, images are uploaded so that they can be included in the tweet
if images:
media_ids = []
for image in images:
filename = image["filename"]
alt = image["alt"]
if len(alt) > 1000:
alt = alt[:996] + "..."
res = twitter_api.media_upload(filename)
id = res.media_id
# If alt text was added to the image on bluesky, it's also added to the image on twitter.
if alt:
write_log("Uploading image " + filename + " with alt: " + alt + " to twitter")
twitter_api.create_media_metadata(id, alt)
media_ids.append(id)
# Checking if the post is longer than 280 characters, and if so sending to the
# splitPost-function.
partTwo = ""
if len(post) > 280:
post, partTwo = split_post(post)
# If the function does not return a post, splitting failed and we will skip this post.
if not post:
return "skipped"
a = twitter_client.create_tweet(text=post, reply_settings=reply_settings, quote_tweet_id=quoted_post, in_reply_to_tweet_id=reply_to_post, media_ids=media_ids)
write_log("Posted to twitter")
id = a[0]["id"]
if partTwo:
a = twitter_client.create_tweet(text=partTwo, in_reply_to_tweet_id=id)
id = a[0]["id"]
return id
def retweet(tweet_id):
a = twitter_client.retweet(tweet_id)
write_log("retweeted tweet " + str(tweet_id))
# Function for splitting up posts that are too long for twitter.
def split_post(text):
write_log("Splitting post that is too long for twitter.")
first = text
# We first try to split the post into sentences, and send as many as can fit in the first one,
# and the rest in the second.
sentences = text.split(". ")
i = 1
while len(first) > 280 and i < len(sentences):
first = ".".join(sentences[:(len(sentences) - i)]) + "."
second = ".".join(sentences[(len(sentences) - i):])
i += 1
# If splitting by sentance does not result in a short enough post, we try splitting by words instead.
if len(first) > 280:
first = text
words = text.split(" ")
i = 1
while len(first) > 280 and i < len(words):
first = " ".join(words[:(len(words) - i)])
second = " ".join(words[(len(words) - i):])
i += 1
# If splitting has ended up with either a first or second part that is too long, we return empty
# strings and the post is not sent to twitter.
if len(first) > 280 or len(second) > 280:
write_log("Was not able to split post.", "error")
first = ""
second = ""
return first, second
def set_reply_settings(allowed):
reply_settings = None
if allowed == "None" or allowed == "Mentioned":
reply_settings = "mentionedUsers"
elif allowed == "Following":
reply_settings = "following"
return reply_settings
import tweepy
from settings import settings
from settings.auth import *
from local.functions import write_log
if settings.Twitter:
twitter_client = tweepy.Client(consumer_key=TWITTER_APP_KEY,
consumer_secret=TWITTER_APP_SECRET,
access_token=TWITTER_ACCESS_TOKEN,
access_token_secret=TWITTER_ACCESS_TOKEN_SECRET)
tweepy_auth = tweepy.OAuth1UserHandler(TWITTER_APP_KEY, TWITTER_APP_SECRET, TWITTER_ACCESS_TOKEN, TWITTER_ACCESS_TOKEN_SECRET)
twitter_api = tweepy.API(tweepy_auth)
# Function for posting tweets
def tweet(post, reply_to_post, quoted_post, images, allowed_reply):
media_ids = None
reply_settings = set_reply_settings(allowed_reply)
# If post includes images, images are uploaded so that they can be included in the tweet
if images:
media_ids = []
for image in images:
filename = image["filename"]
alt = image["alt"]
if len(alt) > 1000:
alt = alt[:996] + "..."
res = twitter_api.media_upload(filename)
id = res.media_id
# If alt text was added to the image on bluesky, it's also added to the image on twitter.
if alt:
write_log("Uploading image " + filename + " with alt: " + alt + " to twitter")
twitter_api.create_media_metadata(id, alt)
media_ids.append(id)
# Checking if the post is longer than 280 characters, and if so sending to the
# splitPost-function.
partTwo = ""
if len(post) > 280:
post, partTwo = split_post(post)
# If the function does not return a post, splitting failed and we will skip this post.
if not post:
return "skipped"
a = twitter_client.create_tweet(text=post, reply_settings=reply_settings, quote_tweet_id=quoted_post, in_reply_to_tweet_id=reply_to_post, media_ids=media_ids)
write_log("Posted to twitter")
id = a[0]["id"]
if partTwo:
a = twitter_client.create_tweet(text=partTwo, in_reply_to_tweet_id=id)
id = a[0]["id"]
return id
def retweet(tweet_id):
a = twitter_client.retweet(tweet_id)
write_log("retweeted tweet " + str(tweet_id))
# Function for splitting up posts that are too long for twitter.
def split_post(text):
write_log("Splitting post that is too long for twitter.")
first = text
# We first try to split the post into sentences, and send as many as can fit in the first one,
# and the rest in the second.
sentences = text.split(". ")
i = 1
while len(first) > 280 and i < len(sentences):
first = ".".join(sentences[:(len(sentences) - i)]) + "."
second = ".".join(sentences[(len(sentences) - i):])
i += 1
# If splitting by sentance does not result in a short enough post, we try splitting by words instead.
if len(first) > 280:
first = text
words = text.split(" ")
i = 1
while len(first) > 280 and i < len(words):
first = " ".join(words[:(len(words) - i)])
second = " ".join(words[(len(words) - i):])
i += 1
# If splitting has ended up with either a first or second part that is too long, we return empty
# strings and the post is not sent to twitter.
if len(first) > 280 or len(second) > 280:
write_log("Was not able to split post.", "error")
first = ""
second = ""
return first, second
def set_reply_settings(allowed):
reply_settings = None
if allowed == "None" or allowed == "Mentioned":
reply_settings = "mentionedUsers"
elif allowed == "Following":
reply_settings = "following"
return reply_settings

View File

@@ -1,31 +1,31 @@
import os
# All necessary tokens, passwords, etc.
# Your bluesky handle should include your instance, so for example handle.bsky.social if you are on the main one.
BSKY_HANDLE = ""
# Generate an app password in the settings on bluesky. DO NOT use your main password.
BSKY_PASSWORD = ""
# Your mastodon handle. Not needed for authentication, but used for making "quote posts".
MASTODON_HANDLE = ""
# The mastodon instance your account is on.
MASTODON_INSTANCE = ""
# Generate your token in the development settings on your mastodon account. Token must have the permissions to
# post statuses (write:statuses)
MASTODON_TOKEN = ""
# Get api keys and tokens from the twitter developer portal (developer.twitter.com). You need to create a project
# and make sure the access token and secret has read and write permissions.
TWITTER_APP_KEY = ""
TWITTER_APP_SECRET = ""
TWITTER_ACCESS_TOKEN = ""
TWITTER_ACCESS_TOKEN_SECRET = ""
# Override settings with environment variables if they exist
BSKY_HANDLE = os.environ.get('BSKY_HANDLE') if os.environ.get('BSKY_HANDLE') else BSKY_HANDLE
BSKY_PASSWORD = os.environ.get('BSKY_PASSWORD') if os.environ.get('BSKY_PASSWORD') else BSKY_PASSWORD
MASTODON_INSTANCE = os.environ.get('MASTODON_INSTANCE') if os.environ.get('MASTODON_INSTANCE') else MASTODON_INSTANCE
MASTODON_HANDLE = os.environ.get('MASTODON_HANDLE') if os.environ.get('MASTODON_HANDLE') else MASTODON_HANDLE
MASTODON_TOKEN = os.environ.get('MASTODON_TOKEN') if os.environ.get('MASTODON_TOKEN') else MASTODON_TOKEN
TWITTER_APP_KEY = os.environ.get('TWITTER_APP_KEY') if os.environ.get('TWITTER_APP_KEY') else TWITTER_APP_KEY
TWITTER_APP_SECRET = os.environ.get('TWITTER_APP_SECRET') if os.environ.get('TWITTER_APP_SECRET') else TWITTER_APP_SECRET
TWITTER_ACCESS_TOKEN = os.environ.get('TWITTER_ACCESS_TOKEN') if os.environ.get('TWITTER_ACCESS_TOKEN') else TWITTER_ACCESS_TOKEN
import os
# All necessary tokens, passwords, etc.
# Your bluesky handle should include your instance, so for example handle.bsky.social if you are on the main one.
BSKY_HANDLE = ""
# Generate an app password in the settings on bluesky. DO NOT use your main password.
BSKY_PASSWORD = ""
# Your mastodon handle. Not needed for authentication, but used for making "quote posts".
MASTODON_HANDLE = ""
# The mastodon instance your account is on.
MASTODON_INSTANCE = ""
# Generate your token in the development settings on your mastodon account. Token must have the permissions to
# post statuses (write:statuses)
MASTODON_TOKEN = ""
# Get api keys and tokens from the twitter developer portal (developer.twitter.com). You need to create a project
# and make sure the access token and secret has read and write permissions.
TWITTER_APP_KEY = ""
TWITTER_APP_SECRET = ""
TWITTER_ACCESS_TOKEN = ""
TWITTER_ACCESS_TOKEN_SECRET = ""
# Override settings with environment variables if they exist
BSKY_HANDLE = os.environ.get('BSKY_HANDLE') if os.environ.get('BSKY_HANDLE') else BSKY_HANDLE
BSKY_PASSWORD = os.environ.get('BSKY_PASSWORD') if os.environ.get('BSKY_PASSWORD') else BSKY_PASSWORD
MASTODON_INSTANCE = os.environ.get('MASTODON_INSTANCE') if os.environ.get('MASTODON_INSTANCE') else MASTODON_INSTANCE
MASTODON_HANDLE = os.environ.get('MASTODON_HANDLE') if os.environ.get('MASTODON_HANDLE') else MASTODON_HANDLE
MASTODON_TOKEN = os.environ.get('MASTODON_TOKEN') if os.environ.get('MASTODON_TOKEN') else MASTODON_TOKEN
TWITTER_APP_KEY = os.environ.get('TWITTER_APP_KEY') if os.environ.get('TWITTER_APP_KEY') else TWITTER_APP_KEY
TWITTER_APP_SECRET = os.environ.get('TWITTER_APP_SECRET') if os.environ.get('TWITTER_APP_SECRET') else TWITTER_APP_SECRET
TWITTER_ACCESS_TOKEN = os.environ.get('TWITTER_ACCESS_TOKEN') if os.environ.get('TWITTER_ACCESS_TOKEN') else TWITTER_ACCESS_TOKEN
TWITTER_ACCESS_TOKEN_SECRET = os.environ.get('TWITTER_ACCESS_TOKEN_SECRET') if os.environ.get('TWITTER_ACCESS_TOKEN_SECRET') else TWITTER_ACCESS_TOKEN_SECRET

View File

@@ -1,17 +1,17 @@
# This file contains all necessary file and folder paths. Make sure to end folder paths with "/".
# base_path is the path from root to the lowest common denominator for all of the other paths.
# Using an absolute path is especially important if running via cron.
base_path = "./"
# Path to the database file. If you want it somewhere other than directly in the base path you can
# either write the entire path manually, or just add the rest of the path on top of the basePath.
database_path = base_path + "db/database.json"
# Path to the cache-file, which keeps track of recent posts, allowing you to limit posts per hours and
# retweet yourself
post_cache_path = base_path + "db/post.cache"
# Path to backup of database.
backup_path = base_path + "backup/" + "database.bak"
# Path for storing logs
log_path = base_path + "logs/"
# Path to folder for temporary storage of images
image_path = base_path + "images/"
# This file contains all necessary file and folder paths. Make sure to end folder paths with "/".
# base_path is the path from root to the lowest common denominator for all of the other paths.
# Using an absolute path is especially important if running via cron.
base_path = "./"
# Path to the database file. If you want it somewhere other than directly in the base path you can
# either write the entire path manually, or just add the rest of the path on top of the basePath.
database_path = base_path + "db/database.json"
# Path to the cache-file, which keeps track of recent posts, allowing you to limit posts per hours and
# retweet yourself
post_cache_path = base_path + "db/post.cache"
# Path to backup of database.
backup_path = base_path + "backup/" + "database.bak"
# Path for storing logs
log_path = base_path + "logs/"
# Path to folder for temporary storage of images
image_path = base_path + "images/"

View File

@@ -1,78 +1,71 @@
import os
# Enables/disables crossposting to twitter and mastodon
# Accepted values: True, False
Twitter = True
Mastodon = True
# log_level determines what messages will be written to the log.
# "error" means only error messages will be written to the log.
# "verbose" means all messages will be written to the log.
# "none" means no messages will be written to the log (not recommended).
# Accepted values: error, verbose, none
log_level = "verbose"
# visibility sets what visibility should be used when posting to Mastodon. Options are "public" for always public, "unlisted" for always unlisted,
# "private" for always private and "hybrid" for all posts public except responses in threads (meaning first post in a thread is public and the rest unlisted).
# Accepted values: public, private, hybrid
visibility = "hybrid"
# mentions set what is to be done with posts containing a mention of another user. Options are "ignore",
# for crossposting with no change, "skip" for skipping posts with mentions, "strip" for removing
# the starting @ of a username and "url" to replace the username with a link to their bluesky profile.
# Accepted values: ignore, skip, strip, url
mentions = "strip"
# post_default sets default posting mode. True means all posts will be crossposted unless otherwise specified,
# False means no posts will be crossposted unless explicitly specified. If no toggle (below) is specified
# post_default will be treated as True no matter what is set.
# Accepted values: True, False
post_default = True
# The function to select what posts are crossposted (mis)uses the language function in Bluesky.
# Enter a language here and all posts will be filtered based on if that language is included
# in the post.
# E.g. if you set post_default to True and add German ("de") as post toggle, all posts including
# German as a language will be skipped. If post_default is set to False, only posts including
# german will be crossposted. You can use different languages as selectors for Mastodon
# and Twitter. You can have both the actual language of the tweet, and the selector language
# added to the tweet and it will still work.
# Accepted values: Any language tag in quotes (https://en.wikipedia.org/wiki/IETF_language_tag)
mastodon_lang = ""
twitter_lang = ""
# quote_posts determines if quote reposts of other posts should be crossposted with the quoted post included as a link. If False these posts will be ignored.
quote_posts = True
# max_retries sets maximum amount of times poster will retry a failed crosspost.
# Accepted values: Integers greater than 0
max_retries = 5
# post_time_limit sets max time limit (in hours) for fetching posts. If no database exists, all posts within this time
# period will be posted.
# Accepted values: Integers greater than 0
post_time_limit = 12
# max_per_hour limits the amount of posts that can be crossposted withing an hour. 0 means no limit.
# Accepted values: Any integer
max_per_hour = 0
# overflow_posts determines what happens to posts that are not crossposted due to the hourly limit.
# If set to "retry" the poster will attempt to send them again when posts per hour are below the limit.
# If set to "skip" the posts will be skipped and the poster will instead continue on with new posts.
# Accepted values: retry, skip
overflow_posts = "retry"
# Override settings with environment variables if they exist
Twitter = os.environ.get('TWITTER_CROSSPOSTING').lower() == 'true' if os.environ.get('TWITTER_CROSSPOSTING') else Twitter
Mastodon = os.environ.get('MASTODON_CROSSPOSTING').lower() == 'true' if os.environ.get('MASTODON_CROSSPOSTING') else Mastodon
log_level = os.environ.get('LOG_LEVEL').lower() == 'true' if os.environ.get('LOG_LEVEL') else log_level
visibility = os.environ.get('MASTODON_VISIBILITY') if os.environ.get('MASTODON_VISIBILITY') else visibility
mentions = os.environ.get('MENTIONS') if os.environ.get('MENTIONS') else mentions
post_default = os.environ.get('POST_DEFAULT').lower() == 'true' if os.environ.get('POST_DEFAULT') else post_default
mastodon_lang = os.environ.get('MASTODON_LANG') if os.environ.get('MASTODON_LANG') else mastodon_lang
twitter_lang = os.environ.get('TWITTER_LANG') if os.environ.get('TWITTER_LANG') else twitter_lang
quote_posts = os.environ.get('QUOTE_POSTS') if os.environ.get('QUOTE_POSTS') else quote_posts
max_retries = int(os.environ.get('MAX_RETRIES')) if os.environ.get('MAX_RETRIES') else max_retries
post_time_limit = int(os.environ.get('POST_TIME_LIMIT')) if os.environ.get('POST_TIME_LIMIT') else post_time_limit
max_per_hour = int(os.environ.get('MAX_PER_HOUR')) if os.environ.get('MAX_PER_HOUR') else max_per_hour
overflow_posts = int(os.environ.get('OVERFLOW_POST')) if os.environ.get('OVERFLOW_POST') else overflow_posts
max_per_hour = 0
over_flow_posts = "retry"
import os
# Enables/disables crossposting to twitter and mastodon
# Accepted values: True, False
Twitter = True
Mastodon = True
# log_level determines what messages will be written to the log.
# "error" means only error messages will be written to the log.
# "verbose" means all messages will be written to the log.
# "none" means no messages will be written to the log (not recommended).
# Accepted values: error, verbose, none
log_level = "verbose"
# visibility sets what visibility should be used when posting to Mastodon. Options are "public" for always public, "unlisted" for always unlisted,
# "private" for always private and "hybrid" for all posts public except responses in threads (meaning first post in a thread is public and the rest unlisted).
# Accepted values: public, private, hybrid
visibility = "hybrid"
# mentions set what is to be done with posts containing a mention of another user. Options are "ignore",
# for crossposting with no change, "skip" for skipping posts with mentions, "strip" for removing
# the starting @ of a username and "url" to replace the username with a link to their bluesky profile.
# Accepted values: ignore, skip, strip, url
mentions = "strip"
# post_default sets default posting mode. True means all posts will be crossposted unless otherwise specified,
# False means no posts will be crossposted unless explicitly specified. If no toggle (below) is specified
# post_default will be treated as True no matter what is set.
# Accepted values: True, False
post_default = True
# The function to select what posts are crossposted (mis)uses the language function in Bluesky.
# Enter a language here and all posts will be filtered based on if that language is included
# in the post.
# E.g. if you set post_default to True and add German ("de") as post toggle, all posts including
# German as a language will be skipped. If post_default is set to False, only posts including
# german will be crossposted. You can use different languages as selectors for Mastodon
# and Twitter. You can have both the actual language of the tweet, and the selector language
# added to the tweet and it will still work.
# Accepted values: Any language tag in quotes (https://en.wikipedia.org/wiki/IETF_language_tag)
mastodon_lang = ""
twitter_lang = ""
# quote_posts determines if quote reposts of other posts should be crossposted with the quoted post included as a link. If False these posts will be ignored.
quote_posts = True
# max_retries sets maximum amount of times poster will retry a failed crosspost.
# Accepted values: Integers greater than 0
max_retries = 5
# post_time_limit sets max time limit (in hours) for fetching posts. If no database exists, all posts within this time
# period will be posted.
# Accepted values: Integers greater than 0
post_time_limit = 12
# max_per_hour limits the amount of posts that can be crossposted withing an hour. 0 means no limit.
# Accepted values: Any integer
max_per_hour = 0
# overflow_posts determines what happens to posts that are not crossposted due to the hourly limit.
# If set to "retry" the poster will attempt to send them again when posts per hour are below the limit.
# If set to "skip" the posts will be skipped and the poster will instead continue on with new posts.
# Accepted values: retry, skip
overflow_posts = "retry"
# Override settings with environment variables if they exist
Twitter = os.environ.get('TWITTER_CROSSPOSTING').lower() == 'true' if os.environ.get('TWITTER_CROSSPOSTING') else Twitter
Mastodon = os.environ.get('MASTODON_CROSSPOSTING').lower() == 'true' if os.environ.get('MASTODON_CROSSPOSTING') else Mastodon
log_level = os.environ.get('LOG_LEVEL').lower() == 'true' if os.environ.get('LOG_LEVEL') else log_level
visibility = os.environ.get('MASTODON_VISIBILITY') if os.environ.get('MASTODON_VISIBILITY') else visibility
mentions = os.environ.get('MENTIONS') if os.environ.get('MENTIONS') else mentions
post_default = os.environ.get('POST_DEFAULT').lower() == 'true' if os.environ.get('POST_DEFAULT') else post_default
mastodon_lang = os.environ.get('MASTODON_LANG') if os.environ.get('MASTODON_LANG') else mastodon_lang
twitter_lang = os.environ.get('TWITTER_LANG') if os.environ.get('TWITTER_LANG') else twitter_lang
quote_posts = os.environ.get('QUOTE_POSTS') if os.environ.get('QUOTE_POSTS') else quote_posts
max_retries = int(os.environ.get('MAX_RETRIES')) if os.environ.get('MAX_RETRIES') else max_retries
post_time_limit = int(os.environ.get('POST_TIME_LIMIT')) if os.environ.get('POST_TIME_LIMIT') else post_time_limit
max_per_hour = int(os.environ.get('MAX_PER_HOUR')) if os.environ.get('MAX_PER_HOUR') else max_per_hour
overflow_posts = int(os.environ.get('OVERFLOW_POST')) if os.environ.get('OVERFLOW_POST') else overflow_posts