master
lza_menace 2 years ago
commit 2b7a367eda

13
.gitignore vendored

@ -0,0 +1,13 @@
.idea
data/music/*.jpg
data/music/*.webp
data/music/*.ogg*
data/cross.liq
data/db.sqlite3
data/liquidsoap.service
data/icecast.xml
data/radio_nginx.conf
data/soap.liq
__pycache__
.venv
settings.py

@ -0,0 +1,151 @@
# PhunkRadio
PhunkRadio is a fork of IRC!Radio made by my friend dsc_.
It is a radio station for Discord servers. You hang around
on Discord, adding YouTube songs to the bot, listening to it with
all your friends. Great fun!
The original repository can be found here: https://git.wownero.com/dsc/ircradio
### Stack
PhunkRadio aims to be minimalistic/small using:
- Python >= 3.7
- SQLite
- LiquidSoap >= 1.4.3
- Icecast2
- Quart web framework
## Command list
```text
- !np - current song
- !tune - upvote song
- !boo - downvote song
- !request - search and queue a song by title or YouTube id
- !dj+ - add a YouTube ID to the radiostream
- !dj- - remove a YouTube ID
- !ban+ - ban a YouTube ID and/or nickname
- !ban- - unban a YouTube ID and/or nickname
- !skip - skips current song
- !listeners - show current amount of listeners
- !queue - show queued up music
- !queue_user - queue a random song by user
- !search - search for a title
- !stats - stats
```
## Ubuntu installation
No docker. The following assumes you have a VPS somewhere with root access.
#### 1. Requirements
As `root`:
```
apt install -y liquidsoap icecast2 nginx python3-certbot-nginx python3-virtualenv libogg-dev ffmpeg sqlite3
ufw allow 80
ufw allow 443
```
When the installation asks for icecast2 configuration, skip it.
#### 2. Create system user
As `root`:
```text
adduser radio
```
#### 2. Clone this project
As `radio`:
```bash
su radio
cd ~/
git clone https://git.wownero.com/dsc/ircradio.git
cd ircradio/
virtualenv -p /usr/bin/python3 venv
source venv/bin/activate
pip install -r requirements.txt
```
#### 3. Generate some configs
```bash
cp settings.py_example settings.py
```
Look at `settings.py` and configure it to your liking:
- Change `icecast2_hostname` to your hostname, i.e: `radio.example.com`
- Change `irc_host`, `irc_port`, `irc_channels`, and `irc_admins_nicknames`
- Change the passwords under `icecast2_`
- Change the `liquidsoap_description` to whatever
When you are done, execute this command:
```bash
python run generate
```
This will write icecast2/liquidsoap/nginx configuration files into `data/`.
#### 4. Applying configuration
As `root`, copy the following files:
```bash
cp data/icecast.xml /etc/icecast2/
cp data/liquidsoap.service /etc/systemd/system/
cp data/radio_nginx.conf /etc/nginx/sites-enabled/
```
#### 5. Starting some stuff
As `root` 'enable' icecast2/liquidsoap/nginx, this is to
make sure these applications start when the server reboots.
```bash
sudo systemctl enable liquidsoap
sudo systemctl enable nginx
sudo systemctl enable icecast2
```
And start them:
```bash
sudo systemctl start icecast2
sudo systemctl start liquidsoap
```
Reload & start nginx:
```bash
systemctl reload nginx
sudo systemctl start nginx
```
### 6. Run the webif and IRC bot:
As `radio`, issue the following command:
```bash
python3 run webdev
```
Run it in `screen` or `tux` to keep it up, or write a systemd unit file for it.
### 7. Generate HTTPs certificate
```bash
certbot --nginx
```
Pick "Yes" for redirects.

File diff suppressed because it is too large Load Diff

@ -0,0 +1,4 @@
from ircradio.utils import liquidsoap_check_symlink
liquidsoap_check_symlink()

@ -0,0 +1,347 @@
from typing import List, Optional
import os
import time
import asyncio
import random
import discord
import settings
from ircradio.radio import Radio
from ircradio.youtube import YouTube
from ircradio.factory import discord_bot as bot
msg_queue = asyncio.Queue()
async def message_worker():
from ircradio.factory import app
while True:
try:
data: dict = await msg_queue.get()
target = data['target']
msg = data['message']
await target.send(msg)
except Exception as ex:
app.logger.error(f"message_worker(): {ex}")
await asyncio.sleep(0.3)
def start():
bot.loop.create_task(bot.start(settings.discord_token))
async def send_message(target: str, message: str):
await msg_queue.put({"target": target, "message": message})
@bot.event
async def on_ready():
print('We have logged in as {0.user}'.format(bot))
@bot.event
async def on_message(message):
from ircradio.factory import app
from ircradio.models import Ban
if message.author == bot.user:
return
msg = message.content
if not msg.startswith(settings.discord_command_prefix):
return
msg = msg[len(settings.discord_command_prefix):]
try:
if message.author not in settings.discord_admins:
banned = Ban.select().filter(utube_id_or_nick=message.author).get()
if banned:
return
except:
pass
data = {
"nick": str(message.author),
"target": message.channel
}
spl = msg.split(" ")
cmd = spl[0].strip()
spl = spl[1:]
if cmd.endswith("+") or cmd.endswith("-"):
spl.insert(0, cmd[-1])
cmd = cmd[:-1]
if cmd in Commands.LOOKUP and hasattr(Commands, cmd):
attr = getattr(Commands, cmd)
try:
await attr(*spl, **data)
except Exception as ex:
app.logger.error(f"message_worker(): {ex}")
pass
class Commands:
LOOKUP = ['np', 'tune', 'boo', 'request', 'dj',
'skip', 'listeners', 'queue',
'queue_user', 'pop', 'search', 'stats',
'rename', 'ban', 'whoami', 'hello']
@staticmethod
async def hello(*args, target=None, nick=None, **kwards):
"""say hi"""
hi = f"Why hello there, {nick}!"
await send_message(target=target, message=hi)
@staticmethod
async def np(*args, target=None, nick=None, **kwargs):
"""current song"""
history = Radio.history()
if not history:
return await send_message(target, f"Nothing is playing?!")
song = history[0]
np = f"Now playing: {song.title} (rating: {song.karma}/10; submitter: {song.added_by}; id: {song.utube_id})"
await send_message(target=target, message=np)
@staticmethod
async def tune(*args, target=None, nick=None, **kwargs):
"""upvote song"""
history = Radio.history()
if not history:
return await send_message(target, f"Nothing is playing?!")
song = history[0]
if song.karma <= 9:
song.karma += 1
song.save()
msg = f"Rating for \"{song.title}\" is {song.karma}/10 .. PARTY ON!!!!"
await send_message(target=target, message=msg)
@staticmethod
async def boo(*args, target=None, nick=None, **kwargs):
"""downvote song"""
history = Radio.history()
if not history:
return await send_message(target, f"Nothing is playing?!")
song = history[0]
if song.karma >= 1:
song.karma -= 1
song.save()
msg = f"Rating for \"{song.title}\" is {song.karma}/10 .. BOOO!!!!"
await send_message(target=target, message=msg)
@staticmethod
async def request(*args, target=None, nick=None, **kwargs):
"""request a song by title or YouTube id"""
from ircradio.models import Song
if not args:
send_message(target=target, message="usage: !request <id>")
needle = " ".join(args)
try:
songs = Song.search(needle)
except Exception as ex:
return await send_message(target, f"{ex}")
if not songs:
return await send_message(target, "Not found!")
if len(songs) >= 2:
random.shuffle(songs)
await send_message(target, "Multiple found:")
for s in songs[:4]:
await send_message(target, f"{s.utube_id} | {s.title}")
return
song = songs[0]
msg = f"Added {song.title} to the queue"
Radio.queue(song)
return await send_message(target, msg)
@staticmethod
async def search(*args, target=None, nick=None, **kwargs):
"""search for a title"""
from ircradio.models import Song
if not args:
return await send_message(target=target, message="usage: !search <id>")
needle = " ".join(args)
songs = Song.search(needle)
if not songs:
return await send_message(target, "No song(s) found!")
if len(songs) == 1:
song = songs[0]
await send_message(target, f"{song.utube_id} | {song.title}")
else:
random.shuffle(songs)
await send_message(target, "Multiple found:")
for s in songs[:4]:
await send_message(target, f"{s.utube_id} | {s.title}")
@staticmethod
async def dj(*args, target=None, nick=None, **kwargs):
"""add (or remove) a YouTube ID to the radiostream"""
from ircradio.models import Song
if not args or args[0] not in ["-", "+"]:
return await send_message(target, "usage: dj+ <youtube_id>")
add: bool = args[0] == "+"
utube_id = args[1]
if not YouTube.is_valid_uid(utube_id):
return await send_message(target, "YouTube ID not valid.")
if add:
try:
await send_message(target, f"Scheduled download for '{utube_id}'")
song = await YouTube.download(utube_id, added_by=nick)
await send_message(target, f"'{song.title}' added")
except Exception as ex:
return await send_message(target, f"Download '{utube_id}' failed; {ex}")
else:
try:
Song.delete_song(utube_id)
await send_message(target, "Press F to pay respects.")
except Exception as ex:
await send_message(target, f"Failed to remove {utube_id}; {ex}")
@staticmethod
async def skip(*args, target=None, nick=None, **kwargs):
"""skips current song"""
from ircradio.factory import app
try:
Radio.skip()
except Exception as ex:
app.logger.error(f"{ex}")
return await send_message(target=target, message="Error")
await send_message(target, message="Song skipped. Booo! >:|")
@staticmethod
async def listeners(*args, target=None, nick=None, **kwargs):
"""current amount of listeners"""
from ircradio.factory import app
try:
listeners = await Radio.listeners()
if listeners:
msg = f"{listeners} client"
if listeners >= 2:
msg += "s"
msg += " connected"
return await send_message(target, msg)
return await send_message(target, f"no listeners, much sad :((")
except Exception as ex:
app.logger.error(f"{ex}")
await send_message(target=target, message="Error")
@staticmethod
async def queue(*args, target=None, nick=None, **kwargs):
"""show currently queued tracks"""
from ircradio.models import Song
q: List[Song] = Radio.queues()
if not q:
return await send_message(target, "queue empty")
for i, s in enumerate(q):
await send_message(target, f"{s.utube_id} | {s.title}")
if i >= 12:
await send_message(target, "And some more...")
@staticmethod
async def rename(*args, target=None, nick=None, **kwargs):
from ircradio.models import Song
try:
utube_id = args[0]
title = " ".join(args[1:])
if not utube_id or not title or not YouTube.is_valid_uid(utube_id):
raise Exception("bad input")
except:
return await send_message(target, "usage: !rename <id> <new title>")
try:
song = Song.select().where(Song.utube_id == utube_id).get()
if not song:
raise Exception("Song not found")
except Exception as ex:
return await send_message(target, "Song not found.")
if song.added_by != nick and nick not in settings.irc_admins_nicknames:
return await send_message(target, "You may only rename your own songs.")
try:
Song.update(title=title).where(Song.utube_id == utube_id).execute()
except Exception as ex:
return await send_message(target, "Rename failure.")
await send_message(target, "Song renamed.")
@staticmethod
async def queue_user(*args, target=None, nick=None, **kwargs):
"""queue random song by username"""
from ircradio.models import Song
added_by = args[0]
try:
q = Song.select().where(Song.added_by ** f"%{added_by}%")
songs = [s for s in q]
except:
return await send_message(target, "No results.")
for i in range(0, 5):
song = random.choice(songs)
if Radio.queue(song):
return await send_message(target, f"A random {added_by} has appeared in the queue: {song.title}")
await send_message(target, "queue_user exhausted!")
@staticmethod
async def stats(*args, target=None, nick=None, **kwargs):
"""random stats"""
songs = 0
try:
from ircradio.models import db
cursor = db.execute_sql('select count(*) from song;')
res = cursor.fetchone()
songs = res[0]
except:
pass
disk = os.popen(f"du -h {settings.dir_music}").read().split("\t")[0]
await send_message(target, f"Songs: {songs} | Disk: {disk}")
@staticmethod
async def ban(*args, target=None, nick=None, **kwargs):
"""add (or remove) a YouTube ID ban (admins only)"""
if nick not in settings.irc_admins_nicknames:
await send_message(target, "You need to be an admin.")
return
from ircradio.models import Song, Ban
if not args or args[0] not in ["-", "+"]:
return await send_message(target, "usage: ban+ <youtube_id or nickname>")
try:
add: bool = args[0] == "+"
arg = args[1]
except:
return await send_message(target, "usage: ban+ <youtube_id or nickname>")
if add:
Ban.create(utube_id_or_nick=arg)
else:
Ban.delete().where(Ban.utube_id_or_nick == arg).execute()
await send_message(target, "Redemption")
@staticmethod
async def whoami(*args, target=None, nick=None, **kwargs):
if nick in settings.discord_admins:
await send_message(target, "admin")
else:
await send_message(target, "user")

@ -0,0 +1,86 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2021, dsc@xmr.pm
from typing import List, Optional
import os
import logging
import asyncio
import discord
from quart import Quart
import settings
from ircradio.radio import Radio
from ircradio.utils import print_banner
from ircradio.youtube import YouTube
import ircradio.models
app = None
user_agents: List[str] = None
websocket_sessions = set()
download_queue = asyncio.Queue()
discord_bot = None
soap = Radio()
# icecast2 = IceCast2()
async def download_thing():
global download_queue
a = await download_queue.get()
e = 1
async def _setup_icecast2(app: Quart):
global icecast2
await icecast2.write_config()
async def _setup_database(app: Quart):
import peewee
models = peewee.Model.__subclasses__()
for m in models:
m.create_table()
async def _setup_discord(app: Quart):
global discord_bot
loop = asyncio.get_event_loop()
discord_bot = discord.Client(loop=loop)
from ircradio.disco import start, message_worker
start()
asyncio.create_task(message_worker())
async def _setup_user_agents(app: Quart):
global user_agents
with open(os.path.join(settings.cwd, 'data', 'agents.txt'), 'r') as f:
user_agents = [l.strip() for l in f.readlines() if l.strip()]
async def _setup_requirements(app: Quart):
ls_reachable = soap.liquidsoap_reachable()
if not ls_reachable:
raise Exception("liquidsoap is not running, please start it first")
def create_app():
global app, soap, icecast2
app = Quart(__name__)
app.logger.setLevel(logging.INFO)
@app.before_serving
async def startup():
await _setup_requirements(app)
await _setup_database(app)
await _setup_user_agents(app)
await _setup_discord(app)
import ircradio.routes
from ircradio.youtube import YouTube
asyncio.create_task(YouTube.update_loop())
print_banner()
return app

@ -0,0 +1,127 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2021, dsc@xmr.pm
import os
import re
from typing import Optional, List
from datetime import datetime
import mutagen
from peewee import SqliteDatabase, SQL
import peewee as pw
from ircradio.youtube import YouTube
import settings
db = SqliteDatabase(f"{settings.cwd}/data/db.sqlite3")
class Ban(pw.Model):
id = pw.AutoField()
utube_id_or_nick = pw.CharField(index=True)
class Meta:
database = db
class Song(pw.Model):
id = pw.AutoField()
date_added = pw.DateTimeField(default=datetime.now)
title = pw.CharField(index=True)
utube_id = pw.CharField(index=True, unique=True)
added_by = pw.CharField(index=True, constraints=[SQL('COLLATE NOCASE')]) # ILIKE index
duration = pw.IntegerField()
karma = pw.IntegerField(default=5, index=True)
banned = pw.BooleanField(default=False)
@staticmethod
def delete_song(utube_id: str) -> bool:
from ircradio.factory import app
try:
fn = f"{settings.dir_music}/{utube_id}.ogg"
Song.delete().where(Song.utube_id == utube_id).execute()
os.remove(fn)
except Exception as ex:
app.logger.error(f"{ex}")
return False
@staticmethod
def search(needle: str, min_chars=3) -> List['Song']:
needle = needle.replace("%", "")
if len(needle) < min_chars:
raise Exception("Search too short. Wow. More typing plz. Much effort.")
if YouTube.is_valid_uid(needle):
try:
song = Song.select().filter(Song.utube_id == needle).get()
return [song]
except:
pass
try:
q = Song.select().filter(Song.title ** f"%{needle}%")
return [s for s in q]
except:
pass
return []
@staticmethod
def by_uid(uid: str) -> Optional['Song']:
try:
return Song.select().filter(Song.utube_id == uid).get()
except:
pass
@staticmethod
def from_filepath(filepath: str) -> Optional['Song']:
fn = os.path.basename(filepath)
name, ext = fn.split(".", 1)
if not YouTube.is_valid_uid(name):
raise Exception("invalid youtube id")
try:
return Song.select().filter(utube_id=name).get()
except:
return Song.auto_create_from_filepath(filepath)
@staticmethod
def auto_create_from_filepath(filepath: str) -> Optional['Song']:
from ircradio.factory import app
fn = os.path.basename(filepath)
uid, ext = fn.split(".", 1)
if not YouTube.is_valid_uid(uid):
raise Exception("invalid youtube id")
metadata = YouTube.metadata_from_filepath(filepath)
if not metadata:
return
app.logger.info(f"auto-creating for {fn}")
try:
song = Song.create(
duration=metadata['duration'],
title=metadata['name'],
added_by='radio',
karma=5,
utube_id=uid)
return song
except Exception as ex:
app.logger.error(f"{ex}")
pass
@property
def filepath(self):
"""Absolute"""
return os.path.join(settings.dir_music, f"{self.utube_id}.ogg")
@property
def filepath_noext(self):
"""Absolute filepath without extension ... maybe"""
try:
return os.path.splitext(self.filepath)[0]
except:
return self.filepath
class Meta:
database = db

@ -0,0 +1,168 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2021, dsc@xmr.pm
import re
import os
import socket
from typing import List, Optional, Dict
import asyncio
import sys
import settings
from ircradio.models import Song
from ircradio.utils import httpget
from ircradio.youtube import YouTube
class Radio:
@staticmethod
def queue(song: Song) -> bool:
from ircradio.factory import app
queues = Radio.queues()
queues_filepaths = [s.filepath for s in queues]
if song.filepath in queues_filepaths:
app.logger.info(f"already added to queue: {song.filepath}")
return False
Radio.command(f"requests.push {song.filepath}")
return True
@staticmethod
def skip() -> None:
Radio.command(f"{settings.liquidsoap_iface}.skip")
@staticmethod
def queues() -> Optional[List[Song]]:
"""get queued songs"""
from ircradio.factory import app
queues = Radio.command(f"requests.queue")
try:
queues = [q for q in queues.split(b"\r\n") if q != b"END" and q]
if not queues:
return []
queues = [q.decode() for q in queues[0].split(b" ")]
except Exception as ex:
app.logger.error(str(ex))
raise Exception("Error")
paths = []
for request_id in queues:
meta = Radio.command(f"request.metadata {request_id}")
path = Radio.filenames_from_strlist(meta.decode(errors="ignore").split("\n"))
if path:
paths.append(path[0])
songs = []
for fn in list(dict.fromkeys(paths)):
try:
song = Song.from_filepath(fn)
if not song:
continue
songs.append(song)
except Exception as ex:
app.logger.warning(f"skipping {fn}; file not found or something: {ex}")
# remove the now playing song from the queue
now_playing = Radio.now_playing()
if songs and now_playing:
if songs[0].filepath == now_playing.filepath:
songs = songs[1:]
return songs
@staticmethod
async def get_icecast_metadata() -> Optional[Dict]:
from ircradio.factory import app
# http://127.0.0.1:24100/status-json.xsl
url = f"http://{settings.icecast2_bind_host}:{settings.icecast2_bind_port}"
url = f"{url}/status-json.xsl"
try:
blob = await httpget(url, json=True)
if not isinstance(blob, dict) or "icestats" not in blob:
raise Exception("icecast2 metadata not dict")
return blob["icestats"].get('source')
except Exception as ex:
app.logger.error(f"{ex}")
@staticmethod
def history() -> Optional[List[Song]]:
# 0 = currently playing
from ircradio.factory import app
try:
status = Radio.command(f"{settings.liquidsoap_iface}.metadata")
status = status.decode(errors="ignore")
except Exception as ex:
app.logger.error(f"{ex}")
raise Exception("failed to contact liquidsoap")
try:
# paths = re.findall(r"filename=\"(.*)\"", status)
paths = Radio.filenames_from_strlist(status.split("\n"))
# reverse, limit
paths = paths[::-1][:5]
songs = []
for fn in list(dict.fromkeys(paths)):
try:
song = Song.from_filepath(fn)
if not song:
continue
songs.append(song)
except Exception as ex:
app.logger.warning(f"skipping {fn}; file not found or something: {ex}")
except Exception as ex:
app.logger.error(f"{ex}")
app.logger.error(f"liquidsoap status:\n{status}")
raise Exception("error parsing liquidsoap status")
return songs
@staticmethod
def command(cmd: str) -> bytes:
"""via LiquidSoap control port"""
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.connect((settings.liquidsoap_host, settings.liquidsoap_port))
sock.sendall(cmd.encode() + b"\n")
data = sock.recv(4096*1000)
sock.close()
return data
@staticmethod
def liquidsoap_reachable():
from ircradio.factory import app
try:
Radio.command("help")
except Exception as ex:
app.logger.error("liquidsoap not reachable")
return False
return True
@staticmethod
def now_playing():
try:
now_playing = Radio.history()
if now_playing:
return now_playing[0]
except:
pass
@staticmethod
async def listeners():
data: dict = await Radio.get_icecast_metadata()
if not data:
return 0
return data.get('listeners', 0)
@staticmethod
def filenames_from_strlist(strlist: List[str]) -> List[str]:
paths = []
for line in strlist:
if not line.startswith("filename"):
continue
line = line[10:]
fn = line[:-1]
if not os.path.exists(fn):
continue
paths.append(fn)
return paths

@ -0,0 +1,133 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2021, dsc@xmr.pm
from datetime import datetime
from typing import Tuple, Optional
from quart import request, render_template, abort, jsonify
import asyncio
import json
import settings
from ircradio.factory import app
from ircradio.radio import Radio
@app.route("/")
async def root():
return await render_template("index.html", settings=settings)
history_cache: Optional[Tuple] = None
@app.route("/history.txt")
async def history():
global history_cache
now = datetime.now()
if history_cache:
if (now - history_cache[0]).total_seconds() <= 5:
print("from cache")
return history_cache[1]
history = Radio.history()
if not history:
return "no history"
data = ""
for i, s in enumerate(history[:10]):
data += f"{i+1}) <a target=\"_blank\" href=\"https://www.youtube.com/watch?v={s.utube_id}\">{s.utube_id}</a>; {s.title} <br>"
history_cache = [now, data]
return data
@app.route("/search")
async def search():
# search json api endpoint
# e.g: /search?name=test&limit=5&offset=0
if not settings.enable_search_route:
abort(404)
from ircradio.models import Song
name = request.args.get("name")
limit = request.args.get("limit", '20')
offset = request.args.get("offset", '0')
try:
limit = int(limit)
offset = int(offset)
except:
limit = 50
offset = 0
if not name or len(name) <= 2:
abort(404)
if limit > 50:
limit = 50
name = f"%{name}%"
try:
q = Song.select()
q = q.where((Song.added_by ** name) | (Song.title ** name))
q = q.order_by(Song.date_added.desc())
q = q.limit(limit).offset(offset)
results = [{
"added_by": s.added_by,
"karma": s.karma,
"id": s.id,
"title": s.title,
"utube_id": s.utube_id,
"date_added": s.date_added.strftime("%Y-%m-%d")
} for s in q]
except:
return jsonify([])
return jsonify(results)
@app.route("/library")
async def user_library():
from ircradio.models import Song
name = request.args.get("name")
if not name:
abort(404)
try:
by_date = Song.select().filter(Song.added_by == name)\
.order_by(Song.date_added.desc())
except:
by_date = []
if not by_date:
abort(404)
try:
by_karma = Song.select().filter(Song.added_by == name)\
.order_by(Song.karma.desc())
except:
by_karma = []
return await render_template("library.html", name=name, by_date=by_date, by_karma=by_karma)
@app.websocket("/ws")
async def np():
last_song = ""
while True:
"""get current song from history"""
history = Radio.history()
val = ""
if not history:
val = f"Nothing is playing?!"
else:
song = history[0]
val = song.title
if val != last_song:
data = json.dumps({"now_playing": val})
await websocket.send(f"{data}")
last_song = val
await asyncio.sleep(5)

@ -0,0 +1,79 @@
// tracks input in 'search' field (id=general)
let input_so_far = "";
// cached song list and cached queries
var queries = [];
var songs = new Map([]);
// track async fetch and processing
var returned = false;
$("#general").keyup( function() {
input_so_far = document.getElementsByName("general")[0].value;
if (input_so_far.length < 3) {
$("#table tr").remove();
return
};
if (!queries.includes(input_so_far.toLowerCase() ) ) {
queries.push(input_so_far.toLowerCase() );
returned = false;
const sanitized_input = encodeURIComponent( input_so_far );
const url = 'https://' + document.domain + ':' + location.port + '/search?name=' + sanitized_input + '&limit=15&offset=0'
const LoadData = async () => {
try {
const res = await fetch(url);
console.log("Status code 200 or similar: " + res.ok);
const data = await res.json();
return data;
} catch(err) {
console.error(err)
}
};
LoadData().then(newSongsJson => {
newSongsJson.forEach( (new_song) => {
let already_have = false;
songs.forEach( (_v, key) => {
if (new_song.id == key) { already_have = true; return; };
})
if (!already_have) { songs.set(new_song.utube_id, new_song) }
})
}).then( () => { returned = true } );
};
function renderTable () {
if (returned) {
$("#table tr").remove();
var filtered = new Map(
[...songs]
.filter(([k, v]) =>
( v.title.toLowerCase().includes( input_so_far.toLowerCase() ) ) ||
( v.added_by.toLowerCase().includes( input_so_far.toLowerCase() ) ) )
);
filtered.forEach( (song) => {
let added = song.added_by;
let added_link = '<a href="/library?name=' + added + '" target="_blank" rel="noopener noreferrer">' + added + '</a>';
let title = song.title;
let id = song.utube_id;
let id_link = '<a href="https://www.youtube.com/watch?v=' + id + '" target="_blank" rel="noopener noreferrer">' + id + '</a>';
$('#table tbody').append('<tr><td>'+id_link+'</td><td>'+added_link+'</td><td>'+title+'</td></tr>')
})
} else {
setTimeout(renderTable, 30); // try again in 30 milliseconds
}
};
renderTable();
});

@ -0,0 +1,17 @@
[Unit]
Description={{ description }}
After=network-online.target
Wants=network-online.target
[Service]
User={{ user }}
Group={{ group }}
Environment="{{ env }}"
StateDirectory={{ name | lower }}
LogsDirectory={{ name | lower }}
Type=simple
ExecStart={{ path_executable }} {{ args_executable }}
Restart=always
[Install]
WantedBy=multi-user.target

@ -0,0 +1,65 @@
<!DOCTYPE html>
<html lang="en">
<!--
░░░░░░░█▐▓▓░████▄▄▄█▀▄▓▓▓▌█ very website
░░░░░▄█▌▀▄▓▓▄▄▄▄▀▀▀▄▓▓▓▓▓▌█
░░░▄█▀▀▄▓█▓▓▓▓▓▓▓▓▓▓▓▓▀░▓▌█
░░█▀▄▓▓▓███▓▓▓███▓▓▓▄░░▄▓▐█▌ such html
░█▌▓▓▓▀▀▓▓▓▓███▓▓▓▓▓▓▓▄▀▓▓▐█
▐█▐██▐░▄▓▓▓▓▓▀▄░▀▓▓▓▓▓▓▓▓▓▌█▌ WOW
█▌███▓▓▓▓▓▓▓▓▐░░▄▓▓███▓▓▓▄▀▐█
█▐█▓▀░░▀▓▓▓▓▓▓▓▓▓██████▓▓▓▓▐█
▌▓▄▌▀░▀░▐▀█▄▓▓██████████▓▓▓▌█▌
▌▓▓▓▄▄▀▀▓▓▓▀▓▓▓▓▓▓▓▓█▓█▓█▓▓▌█▌ many music
█▐▓▓▓▓▓▓▄▄▄▓▓▓▓▓▓█▓█▓█▓█▓▓▓▐█
-->
<head>
<meta charset="UTF-8">
<meta http-equiv="X-UA-Compatible" content="IE=edge">
<meta name="viewport" content="width=device-width, initial-scale=1">
<meta name="HandheldFriendly" content="True">
<meta name="MobileOptimized" content="320">
<meta name="viewport" content="width=device-width, initial-scale=1"/>
<meta name="theme-color" content="#ffffff">
<meta name="apple-mobile-web-app-title" content="IRC!Radio">
<meta name="application-name" content="IRC!Radio">
<meta name="msapplication-TileColor" content="#da532c">
<meta name="description" content="IRC!Radio"/>
<title>PhunkRadio</title>
<link rel="stylesheet" href="https://maxcdn.bootstrapcdn.com/bootstrap/4.5.2/css/bootstrap.min.css">
{% if ENABLE_SEARCH_ROUTE %}
<link rel="stylesheet" href="https://code.jquery.com/ui/1.12.1/themes/base/jquery-ui.css"></link>
<script src="https://code.jquery.com/jquery-3.6.0.min.js" integrity="sha256-/xUj+3OJU5yExlq6GSYGSHk7tPXikynS7ogEvDej/m4=" crossorigin="anonymous"></script>
<script src="https://code.jquery.com/ui/1.12.1/jquery-ui.min.js" integrity="sha256-VazP97ZCwtekAsvgPBSUwPFKdrwD3unUfSGVYrahUqU=" crossorigin="anonymous"></script>
{% endif %}
<script>
var ws = new WebSocket('wss://' + document.domain + ':' + location.port + '/ws');
ws.onmessage = function (event) {
// console.log(event.data);
json = JSON.parse(event.data);
np = json.now_playing;
document.querySelector("#now_playing").innerText = np;
// console.log(np);
return false;
};
ws.onclose = function(event) {
console.log("WebSocket is closed now.");
};
</script>
</head>
<body>
{% block content %} {% endblock %}
</body>
</html>

@ -0,0 +1,75 @@
# Crossfade between tracks,
# taking the respective volume levels
# into account in the choice of the
# transition.
# @category Source / Track Processing
# @param ~start_next Crossing duration, if any.
# @param ~fade_in Fade-in duration, if any.
# @param ~fade_out Fade-out duration, if any.
# @param ~width Width of the volume analysis window.
# @param ~conservative Always prepare for
# a premature end-of-track.
# @param s The input source.
def smart_crossfade (~start_next=5.,~fade_in=3.,
~fade_out=3., ~width=2.,
~conservative=false,s)
high = -20.
medium = -32.
margin = 4.
fade.out = fade.out(type="sin",duration=fade_out)
fade.in = fade.in(type="sin",duration=fade_in)
add = fun (a,b) -> add(normalize=false,[b,a])
log = log(label="smart_crossfade")
def transition(a,b,ma,mb,sa,sb)
list.iter(fun(x)->
log(level=4,"Before: #{x}"),ma)
list.iter(fun(x)->
log(level=4,"After : #{x}"),mb)
if
# If A and B and not too loud and close,
# fully cross-fade them.
a <= medium and
b <= medium and
abs(a - b) <= margin
then
log("Transition: crossed, fade-in, fade-out.")
add(fade.out(sa),fade.in(sb))
elsif
# If B is significantly louder than A,
# only fade-out A.
# We don't want to fade almost silent things,
# ask for >medium.
b >= a + margin and a >= medium and b <= high
then
log("Transition: crossed, fade-out.")
add(fade.out(sa),sb)
elsif
# Do not fade if it's already very low.
b >= a + margin and a <= medium and b <= high
then
log("Transition: crossed, no fade-out.")
add(sa,sb)
elsif
# Opposite as the previous one.
a >= b + margin and b >= medium and a <= high
then
log("Transition: crossed, fade-in.")
add(sa,fade.in(sb))
# What to do with a loud end and
# a quiet beginning ?
# A good idea is to use a jingle to separate
# the two tracks, but that's another story.
else
# Otherwise, A and B are just too loud
# to overlap nicely, or the difference
# between them is too large and
# overlapping would completely mask one
# of them.
log("No transition: just sequencing.")
sequence([sa, sb])
end
end
cross(width=width, duration=start_next,
conservative=conservative,
transition,s)
end

@ -0,0 +1,53 @@
<icecast>
<location>Somewhere</location>
<admin>my@email.tld</admin>
<limits>
<clients>32</clients>
<sources>2</sources>
<queue-size>524288</queue-size>
<client-timeout>30</client-timeout>
<header-timeout>15</header-timeout>
<source-timeout>10</source-timeout>
<burst-on-connect>0</burst-on-connect>
<burst-size>65535</burst-size>
</limits>
<authentication>
<source-password>{{ source_password }}</source-password>
<relay-password>{{ relay_password }}</relay-password> <!-- for livestreams -->
<admin-user>admin</admin-user>
<admin-password>{{ admin_password }}</admin-password>
</authentication>
<hostname>{{ hostname }}</hostname>
<listen-socket>
<bind-address>{{ icecast2_bind_host }}</bind-address>
<port>{{ icecast2_bind_port }}</port>
</listen-socket>
<http-headers>
<header name="Access-Control-Allow-Origin" value="*" />
</http-headers>
<fileserve>1</fileserve>
<paths>
<basedir>/usr/share/icecast2</basedir>
<logdir>{{ log_dir }}</logdir>
<webroot>/usr/share/icecast2/web</webroot>
<adminroot>/usr/share/icecast2/admin</adminroot>
</paths>
<logging>
<accesslog>icecast2_access.log</accesslog>
<errorlog>icecast2_error.log</errorlog>
<loglevel>3</loglevel> <!-- 4 Debug, 3 Info, 2 Warn, 1 Error -->
<logsize>10000</logsize> <!-- Max size of a logfile -->
</logging>
<security>
<chroot>0</chroot>
</security>
</icecast>

@ -0,0 +1,93 @@
{% extends "base.html" %}
{% block content %}
<!-- Page Content -->
<div class="container">
<div class="row">
<!-- Post Content Column -->
<div class="col-lg-12">
<!-- Title -->
<h1 class="mt-4" style="margin-bottom: 2rem;">
PhunkRadio
</h1>
<p>Enjoy the music :)</p>
<hr>
<audio controls src="/{{ settings.icecast2_mount }}">Your browser does not support the<code>audio</code> element.</audio>
<p> </p>
<h5>Now playing: </h5>
<div id="now_playing">Nothing here yet</div>
<hr>
<h4>Command list:</h4>
<pre style="font-size:12px;">!np - current song
!tune - upvote song
!boo - downvote song
!request - search and queue a song by title or YouTube id
!dj+ - add a YouTube ID to the radiostream
!dj- - remove a YouTube ID
!ban+ - ban a YouTube ID and/or nickname
!ban- - unban a YouTube ID and/or nickname
!skip - skips current song
!listeners - show current amount of listeners
!queue - show queued up music
!queue_user - queue a random song by user
!search - search for a title
!stats - stats
</pre>
<hr>
<div class="row">
<div class="col-md-3">
<h4>History</h4>
<a href="/history.txt">history.txt</a>
</div>
<div class="col-md-5">
<h4>Library
<small style="font-size:12px">(by user)</small>
</h4>
<form method="GET" action="/library">
<div class="input-group mb-3 style=no-gutters">
<input type="text" class="form-control" id="name" name="name" placeholder="username...">
<div class="input-group-append">
<input class="btn btn-outline-secondary" type="submit" value="Search">
</div>
</div>
</form>
</div>
</div>
{% if ENABLE_SEARCH_ROUTE %}
<hr>
<div class="row">
<div class="col-md-8">
<h4>Quick Search
<small style="font-size:12px">(general)</small>
</h4>
<div class="input-group mb-3">
<input type="text" class="form-control" id="general" name="general" placeholder="query...">
</div>
</div>
<div class="col-md-12">
<table class="table table-sm table-hover table-bordered" id="table" style="font-size:12px">
<thead>
<tbody style="">
</tbody>
</thead>
</table>
</div>
</div>
{% endif %}
<hr>
<h4>IRC</h4>
<pre>{{ settings.irc_host }}:{{ settings.irc_port }}
{{ settings.irc_channels | join(" ") }}
</pre>
</div>
</div>
</div>
{% if ENABLE_SEARCH_ROUTE %}
<script src="static/search.js"></script>
{% endif %}
{% endblock %}

@ -0,0 +1,31 @@
{% extends "base.html" %}
{% block content %}
<!-- Page Content -->
<div class="container">
<div class="row">
<div class="col-lg-12">
<h1 class="mt-4" style="margin-bottom: 2rem;">
Library for {{ name }}
</h1>
<div class="row">
<div class="col-lg-6">
<h5>By date</h5>
<pre style="font-size:12px;">{% for s in by_date %}<a target="_blank" href="https://www.youtube.com/watch?v={{s.utube_id}}">{{s.utube_id}}</a> {{s.title}}
{% endfor %}</pre>
</div>
<div class="col-lg-6">
<h5>By karma</h5>
<pre style="font-size:12px;">{% for s in by_karma %}<a target="_blank" href="https://www.youtube.com/watch?v={{s.utube_id}}">{{s.utube_id}}</a> {{s.karma}} - {{s.title}}
{% endfor %}</pre>
</div>
</div>
<a href="/">
<button type="button" class="btn btn-primary btn-sm">Go back</button>
</a>
</div>
</div>
</div>
{% endblock %}

@ -0,0 +1,56 @@
server {
listen 80;
server_name {{ hostname }};
root /var/www/html;
access_log /dev/null;
error_log /var/log/nginx/radio_error;
client_max_body_size 120M;
fastcgi_read_timeout 1600;
proxy_read_timeout 1600;
index index.html;
error_page 403 /403.html;
location = /403.html {
root /var/www/html;
allow all;
internal;
}
location '/.well-known/acme-challenge' {
default_type "text/plain";
root /tmp/letsencrypt;
autoindex on;
}
location / {
root /var/www/html/;
proxy_pass http://{{ host }}:{{ port }};
proxy_redirect off;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
proxy_set_header X-Forwarded-Proto $scheme;
allow all;
}
location /{{ icecast2_mount }} {
allow all;
add_header 'Access-Control-Allow-Origin' '*';
proxy_pass http://{{ icecast2_bind_host }}:{{ icecast2_bind_port }}/{{ icecast2_mount }};
proxy_redirect off;
proxy_set_header Host $host;
proxy_set_header X-Real-IP $remote_addr;
proxy_set_header X-Forwarded-For $remote_addr;
proxy_set_header X-Forwarded-Proto $scheme;
}
location /ws {
proxy_pass http://{{ host }}:{{ port }}/ws;
proxy_http_version 1.1;
proxy_set_header Upgrade $http_upgrade;
proxy_set_header Connection "Upgrade";
proxy_set_header Host $host;
}
}

@ -0,0 +1,52 @@
#!/usr/bin/liquidsoap
set("log.stdout", true)
set("log.file",false)
%include "cross.liq"
# Allow requests from Telnet (Liquidsoap Requester)
set("server.telnet", true)
set("server.telnet.bind_addr", "{{ liquidsoap_host }}")
set("server.telnet.port", {{ liquidsoap_port }})
set("server.telnet.reverse_dns", false)
# WOW's station track auto-playlist
#+ randomized track playback from the playlist path
#+ play a new random track each time LS performs select()
#+ 90-second timeout on remote track preparation processes
#+ 1.0-hour maximum file length (in case things "run away")
#+ 0.5-hour default file length (in case things "run away")
plist = playlist(
id="playlist",
length=30.0,
default_duration=30.0,
timeout=90.0,
mode="random",
reload=300,
reload_mode="seconds",
mime_type="audio/ogg",
"{{ dir_music }}"
)
# Request Queue from Telnet (Liquidsoap Requester)
requests = request.queue(id="requests")
# Start building the feed with music
radio = plist
# Add in our on-disk security
radio = fallback(id="switcher",track_sensitive = true, [requests, radio, blank(duration=5.)])
# uncomment to normalize the audio stream
#radio = normalize(radio)
# iTunes-style (so-called "dumb" - but good enough) crossfading
full = smart_crossfade(start_next=8., fade_in=6., fade_out=6., width=2., conservative=true, radio)
# Add fallback
full = fallback([radio, blank(duration=5.)])
output.icecast(%vorbis.cbr(samplerate=48000, channels=2, bitrate=164),
host = "{{ icecast2_bind_host }}", port = {{ icecast2_bind_port }},
icy_metadata="true", description="{{ liquidsoap_description }}",
password = "{{ icecast2_source_password }}", mount = "{{ icecast2_mount }}",
full)

@ -0,0 +1,197 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2021, dsc@xmr.pm
from typing import List, Optional, Union
import re
import shutil
import os
import sys
import random
import time
import asyncio
from asyncio.subprocess import Process
from io import TextIOWrapper
import aiofiles
import aiohttp
import jinja2
from jinja2 import Environment, PackageLoader, select_autoescape
import settings
class AsyncSubProcess(object):
def __init__(self, *args, **kwargs):
self.proc: Process = None
self.max_buffer: int = 1000
self.buffer = []
@property
async def is_running(self) -> bool:
return self.proc and self.proc.returncode is None
async def run(self, args: List[str], ws_type_prefix: str):
loop = asyncio.get_event_loop()
read_stdout, write_stdout = os.pipe()
read_stderr, write_stderr = os.pipe()
self.proc = await asyncio.create_subprocess_exec(
*args,
stdin=asyncio.subprocess.PIPE,
stdout=write_stdout,
stderr=write_stderr,
cwd=settings.cwd
)
os.close(write_stdout)
os.close(write_stderr)
f_stdout = os.fdopen(read_stdout, "r")
f_stderr = os.fdopen(read_stderr, "r")
try:
await asyncio.gather(
self.consume(fd=f_stdout, _type='stdout', _type_prefix=ws_type_prefix),
self.consume(fd=f_stderr, _type='stderr', _type_prefix=ws_type_prefix),
self.proc.communicate()
)
finally:
f_stdout.close()
f_stderr.close()
async def consume(self, fd: TextIOWrapper, _type: str, _type_prefix: str):
from ircradio.factory import app
import wow.websockets as websockets
_type_int = 0 if _type == "stdout" else 1
reader = asyncio.StreamReader()
loop = asyncio.get_event_loop()
await loop.connect_read_pipe(
lambda: asyncio.StreamReaderProtocol(reader),
fd
)
async for line in reader:
line = line.strip()
msg = line.decode(errors="ignore")
_logger = app.logger.info if _type_int == 0 else app.logger.error
_logger(msg)
self.buffer.append((int(time.time()), _type_int, msg))
if len(self.buffer) >= self.max_buffer:
self.buffer.pop(0)
await websockets.broadcast(
message=line,
message_type=f"{_type_prefix}_{_type}",
)
async def loopyloop(secs: int, func, after_func=None):
while True:
result = await func()
if after_func:
await after_func(result)
await asyncio.sleep(secs)
def jinja2_render(template_name: str, **data):
loader = jinja2.FileSystemLoader(searchpath=[
os.path.join(settings.cwd, "utils"),
os.path.join(settings.cwd, "ircradio/templates")
])
env = jinja2.Environment(loader=loader, autoescape=select_autoescape())
template = env.get_template(template_name)
return template.render(**data)
async def write_file(fn: str, data: Union[str, bytes], mode="w"):
async with aiofiles.open(fn, mode=mode) as f:
f.write(data)
def write_file_sync(fn: str, data: bytes):
f = open(fn, "wb")
f.write(data)
f.close()
async def executeSQL(sql: str, params: tuple = None):
from ircradio.factory import db
async with db.pool.acquire() as connection:
async with connection.transaction():
result = connection.fetch(sql, params)
return result
def systemd_servicefile(
name: str, description: str, user: str, group: str,
path_executable: str, args_executable: str, env: str = None
) -> bytes:
template = jinja2_render(
"acme.service.jinja2",
name=name,
description=description,
user=user,
group=group,
env=env,
path_executable=path_executable,
args_executable=args_executable
)
return template.encode()
def liquidsoap_version():
ls = shutil.which("liquidsoap")
f = os.popen(f"{ls} --version 2>/dev/null").read()
if not f:
print("please install liquidsoap\n\napt install -y liquidsoap")
sys.exit()
f = f.lower()
match = re.search(r"liquidsoap (\d+.\d+.\d+)", f)
if not match:
return
return match.groups()[0]
def liquidsoap_check_symlink():
msg = """
Due to a bug you need to create this symlink:
$ sudo ln -s /usr/share/liquidsoap/ /usr/share/liquidsoap/1.4.1
info: https://github.com/savonet/liquidsoap/issues/1224
"""
version = liquidsoap_version()
if not os.path.exists(f"/usr/share/liquidsoap/{version}"):
print(msg)
sys.exit()
async def httpget(url: str, json=True, timeout: int = 5, raise_for_status=True, verify_tls=True):
headers = {"User-Agent": random_agent()}
opts = {"timeout": aiohttp.ClientTimeout(total=timeout)}
async with aiohttp.ClientSession(**opts) as session:
async with session.get(url, headers=headers, ssl=verify_tls) as response:
if raise_for_status:
response.raise_for_status()
result = await response.json() if json else await response.text()
if result is None or (isinstance(result, str) and result == ''):
raise Exception("empty response from request")
return result
def random_agent():
from ircradio.factory import user_agents
return random.choice(user_agents)
def print_banner():
print("""\033[91m ▪ ▄▄▄ ▄▄· ▄▄▄ ▄▄▄· ·▄▄▄▄ ▪
· ·
· · ·
. .
. · . \033[0m
""".strip())

@ -0,0 +1,149 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2021, dsc@xmr.pm
import json
import os
import sys
import asyncio
import re
from typing import Optional
import settings
class YouTube:
@staticmethod
async def download(utube_id: str, added_by: str) -> Optional['Song']:
from ircradio.factory import app
from ircradio.models import Song
output = f"{settings.dir_music}/{utube_id}.ogg"
song = Song.by_uid(utube_id)
if song:
if not os.path.exists(output):
# exists in db but not on disk; remove from db
Song.delete().where(Song.utube_id == utube_id).execute()
else:
raise Exception("Song already exists.")
if os.path.exists(output):
song = Song.by_uid(utube_id)
if not song:
# exists on disk but not in db; add to db
return Song.from_filepath(output)
raise Exception("Song already exists.")
try:
proc = await asyncio.create_subprocess_exec(
*["yt-dlp",
"--add-metadata",
"--write-all-thumbnails",
"--write-info-json",
"-f", "bestaudio",
"--max-filesize", "30M",
"--extract-audio",
"--audio-format", "vorbis",
"-o", f"{settings.dir_music}/%(id)s.ogg",
f"https://www.youtube.com/watch?v={utube_id}"],
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
result = await proc.communicate()
result = result[0].decode()
if "100%" not in result:
raise Exception("download did not complete")
except Exception as ex:
msg = f"download failed: {ex}"
app.logger.error(msg)
raise Exception(msg)
try:
metadata = YouTube.metadata_from_filepath(output)
if not metadata:
raise Exception("failed to fetch metadata")
if metadata['duration'] > settings.liquidsoap_max_song_duration:
Song.delete_song(utube_id)
raise Exception(f"Song exceeded duration of {settings.liquidsoap_max_song_duration} seconds")
song = Song.create(
duration=metadata['duration'],
title=metadata['name'],
added_by=added_by,
karma=5,
utube_id=utube_id)
return song
except Exception as ex:
app.logger.error(f"{ex}")
raise
@staticmethod
def metadata_from_filepath(filepath: str):
from ircradio.factory import app
import mutagen
try:
metadata = mutagen.File(filepath)
except Exception as ex:
app.logger.error(f"mutagen failure on {filepath}")
return
try:
duration = metadata.info.length
except:
duration = 0
artist = metadata.tags.get('artist')
if artist:
artist = artist[0]
title = metadata.tags.get('title')
if title:
title = title[0]
if not artist or not title:
# try .info.json
path_info = f"{filepath}.info.json"
if os.path.exists(path_info):
try:
blob = json.load(open(path_info,))
artist = blob.get('artist')
title = blob.get('title')
duration = blob.get('duration', 0)
except:
pass
else:
artist = 'Unknown'
title = 'Unknown'
app.logger.warning(f"could not detect artist/title from metadata for {filepath}")
return {
"name": f"{artist} - {title}",
"data": metadata,
"duration": duration,
"path": filepath
}
@staticmethod
async def update_loop():
while True:
await YouTube.update()
await asyncio.sleep(3600)
@staticmethod
async def update():
pip_path = os.path.join(os.path.dirname(sys.executable), "pip")
proc = await asyncio.create_subprocess_exec(
*[sys.executable, pip_path, "install", "--upgrade", "yt-dlp"],
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE)
stdout, stderr = await proc.communicate()
return stdout.decode()
@staticmethod
async def update_task():
while True:
await YouTube.update()
await asyncio.sleep(3600)
@staticmethod
def is_valid_uid(uid: str) -> bool:
return re.match(settings.re_youtube, uid) is not None

@ -0,0 +1,11 @@
quart
yt-dlp
aiofiles
aiohttp
bottom
tinytag
peewee
python-dateutil
mutagen
peewee
discord.py

@ -0,0 +1,94 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2021, dsc@xmr.pm
import os
import pwd
import logging
import shutil
from quart import render_template
import click
from ircradio.factory import create_app
import settings
@click.group()
def cli():
pass
@cli.command(name="generate")
def cli_generate_configs(*args, **kwargs):
"""Generate icecast2/liquidsoap configs and systemd service files"""
from ircradio.utils import jinja2_render, write_file_sync, systemd_servicefile
templates_dir = os.path.join(settings.cwd, "ircradio", "templates")
# liquidsoap service file
path_liquidsoap = shutil.which("liquidsoap")
path_liquidsoap_config = os.path.join(settings.cwd, "data", "soap.liq")
liquidsoap_systemd_service = systemd_servicefile(
name="liquidsoap",
description="liquidsoap service",
user=pwd.getpwuid(os.getuid()).pw_name,
group=pwd.getpwuid(os.getuid()).pw_name,
path_executable=path_liquidsoap,
args_executable=path_liquidsoap_config,
env="")
write_file_sync(fn=os.path.join(settings.cwd, "data", "liquidsoap.service"), data=liquidsoap_systemd_service)
# liquidsoap config
template = jinja2_render("soap.liq.jinja2",
icecast2_bind_host=settings.icecast2_bind_host,
icecast2_bind_port=settings.icecast2_bind_port,
liquidsoap_host=settings.liquidsoap_host,
liquidsoap_port=settings.liquidsoap_port,
icecast2_mount=settings.icecast2_mount,
liquidsoap_description=settings.liquidsoap_description,
icecast2_source_password=settings.icecast2_source_password,
dir_music=settings.dir_music)
write_file_sync(fn=os.path.join(settings.cwd, "data", "soap.liq"), data=template.encode())
# cross.liq
path_liquidsoap_cross_template = os.path.join(templates_dir, "cross.liq.jinja2")
path_liquidsoap_cross = os.path.join(settings.cwd, "data", "cross.liq")
shutil.copyfile(path_liquidsoap_cross_template, path_liquidsoap_cross)
# icecast2.xml
template = jinja2_render("icecast.xml.jinja2",
icecast2_bind_host=settings.icecast2_bind_host,
icecast2_bind_port=settings.icecast2_bind_port,
hostname="localhost",
log_dir=settings.icecast2_logdir,
source_password=settings.icecast2_source_password,
relay_password=settings.icecast2_relay_password,
admin_password=settings.icecast2_admin_password,
dir_music=settings.dir_music)
path_icecast2_config = os.path.join(settings.cwd, "data", "icecast.xml")
write_file_sync(path_icecast2_config, data=template.encode())
# nginx
template = jinja2_render("nginx.jinja2",
icecast2_bind_host=settings.icecast2_bind_host,
icecast2_bind_port=settings.icecast2_bind_port,
hostname=settings.icecast2_hostname,
icecast2_mount=settings.icecast2_mount,
host=settings.host,
port=settings.port)
path_nginx_config = os.path.join(settings.cwd, "data", "radio_nginx.conf")
write_file_sync(path_nginx_config, data=template.encode())
print(f"written config files to {os.path.join(settings.cwd, 'data')}")
@cli.command(name="webdev")
def webdev(*args, **kwargs):
"""Run the web-if, for development purposes"""
from ircradio.factory import create_app
app = create_app()
app.run(settings.host, port=settings.port, debug=settings.debug, use_reloader=False)
if __name__ == '__main__':
cli()

@ -0,0 +1,46 @@
# SPDX-License-Identifier: BSD-3-Clause
# Copyright (c) 2021, dsc@xmr.pm
import os
cwd = os.path.dirname(os.path.realpath(__file__))
def bool_env(val):
return val is True or (isinstance(val, str) and (val.lower() == 'true' or val == '1'))
debug = True
host = "127.0.0.1"
port = 2600
timezone = "America/Los_Angeles"
dir_music = os.environ.get("DIR_MUSIC", os.path.join(cwd, "data", "music"))
enable_search_route = bool_env(os.environ.get("ENABLE_SEARCH_ROUTE", False))
discord_token = "xxxxxxx" # https://discord.com/developers/applications
discord_admins = ["lza.art101.eth#7207"]
discord_command_prefix = "!"
icecast2_hostname = "localhost"
icecast2_max_clients = 32
icecast2_bind_host = "127.0.0.1"
icecast2_bind_port = 24100
icecast2_mount = "radio.ogg"
icecast2_source_password = "changeme"
icecast2_admin_password = "changeme"
icecast2_relay_password = "changeme" # for livestreams
icecast2_live_mount = "live.ogg"
icecast2_logdir = "/var/log/icecast2/"
liquidsoap_host = "127.0.0.1"
liquidsoap_port = 7555 # telnet
liquidsoap_description = "IRC!Radio"
liquidsoap_samplerate = 48000
liquidsoap_bitrate = 164 # youtube is max 164kbps
liquidsoap_crossfades = False # not implemented yet
liquidsoap_normalize = False # not implemented yet
liquidsoap_iface = icecast2_mount.replace(".", "(dot)")
liquidsoap_max_song_duration = 60 * 11 # seconds
re_youtube = r"[a-zA-Z0-9_-]{11}$"
Loading…
Cancel
Save