#!/usr/bin/env python3 # SPDX-License-Identifier: EUPL-1.2 import re import os import html import json import shutil import sqlite3 import argparse import markovify import multiprocessing import pytomlpp as toml from random import randint from bs4 import BeautifulSoup def arg_parser_factory(*, description): parser = argparse.ArgumentParser(description=description) parser.add_argument( '-c', '--cfg', dest='cfg', default='config.toml', nargs='?', help='Specify a custom location for the config file.' ) return parser def parse_args(*, description): return arg_parser_factory(description=description).parse_args() def load_config(cfg_path): # TOML doesn't support null here so we have to use JSON 😒 with open('config.defaults.json') as f: cfg = json.load(f) with open(cfg_path) as f: cfg.update(toml.load(f)) if not cfg['site'].startswith('https://') and not cfg['site'].startswith('http://'): print("Site must begin with 'https://' or 'http://'. Value '{0}' is invalid - try 'https://{0}' instead.".format(cfg['site']), file=sys.stderr) sys.exit(1) if 'access_token' not in cfg: print('No authentication info', file=sys.stderr) print('Get a client id, client secret, and access token here: https://tinysubversions.com/notes/mastodon-bot/', file=sys.stderr) print('Then put `access_token` in your config file.', file=sys.stderr) sys.exit(1) return cfg def make_sentence(output, cfg): class nlt_fixed(markovify.NewlineText): # modified version of NewlineText that never rejects sentences def test_sentence_input(self, sentence): return True # all sentences are valid <3 shutil.copyfile("toots.db", "toots-copy.db") # create a copy of the database because reply.py will be using the main one db = sqlite3.connect("toots-copy.db") db.text_factory = str c = db.cursor() if cfg['learn_from_cw']: ignored_cws_query_params = "(" + ",".join("?" * len(cfg["ignored_cws"])) + ")" toots = c.execute(f"SELECT content FROM `toots` WHERE cw IS NULL OR CW NOT IN {ignored_cws_query_params} ORDER BY RANDOM() LIMIT 10000", cfg["ignored_cws"]).fetchall() else: toots = c.execute("SELECT content FROM `toots` WHERE cw IS NULL ORDER BY RANDOM() LIMIT 10000").fetchall() if len(toots) == 0: output.send("Database is empty! Try running main.py.") return nlt = markovify.NewlineText if cfg['overlap_ratio_enabled'] else nlt_fixed model = nlt( "\n".join([toot[0] for toot in toots]) ) db.close() os.remove("toots-copy.db") if cfg['limit_length']: sentence_len = randint(cfg['length_lower_limit'], cfg['length_upper_limit']) sentence = None tries = 0 while sentence is None and tries < 10: sentence = model.make_short_sentence( max_chars=500, tries=10000, max_overlap_ratio=cfg['overlap_ratio'] if cfg['overlap_ratio_enabled'] else 0.7, max_words=sentence_len if cfg['limit_length'] else None ) tries = tries + 1 # optionally remove mentions if cfg['mention_handling'] == 1: sentence = re.sub(r"^\S*@\u200B\S*\s?", "", sentence) elif cfg['mention_handling'] == 0: sentence = re.sub(r"\S*@\u200B\S*\s?", "", sentence) output.send(sentence) def make_toot(cfg): toot = None pin, pout = multiprocessing.Pipe(False) p = multiprocessing.Process(target=make_sentence, args=[pout, cfg]) p.start() p.join(5) # wait 5 seconds to get something if p.is_alive(): # if it's still trying to make a toot after 5 seconds p.terminate() p.join() else: toot = pin.recv() if toot is None: toot = 'Toot generation failed! Contact io@csdisaster.club for assistance.' return toot def extract_toot(toot): toot = html.unescape(toot) # convert HTML escape codes to text soup = BeautifulSoup(toot, "html.parser") for lb in soup.select("br"): # replace
with linebreak lb.name = "\n" for p in soup.select("p"): # ditto for

p.name = "\n" for ht in soup.select("a.hashtag"): # convert hashtags from links to text ht.unwrap() for link in soup.select("a"): # convert