You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.
monero.fail/xmrnodes/helpers.py

156 lines
4.4 KiB
Python

import sys
import socket
import pickle
from time import sleep
from os import path
import zmq
from requests import get as r_get
from urllib.parse import urlparse
from levin.bucket import Bucket
from levin.ctypes import *
from levin.constants import LEVIN_SIGNATURE
from xmrnodes.models import Node
from xmrnodes import config
def check_zmq(url: str):
# print(url.split('://'))
node_socket = urlparse(url).netloc
node_url = node_socket.split(':')[0]
port = 18084
tcp_node = f'tcp://{node_url}:{port}'
print('Checking ZMQ for node {}'.format(tcp_node))
try:
ctx = zmq.Context()
subscriber = ctx.socket(zmq.SUB)
subscriber.setsockopt_string(zmq.SUBSCRIBE, '')
subscriber.setsockopt(zmq.CONFLATE, 1)
subscriber.connect(tcp_node)
# data = subscriber.recv()
count = 0
while count < 5:
data = subscriber.recv_multipart()
count += 1
print('success: {}'.format(data))
ctx.term()
except Exception as e:
print(e)
def make_request(url: str, path="/get_info", data=None):
headers = {"Origin": "https://monero.fail"}
if is_onion(url):
_p = f"socks5h://{config.TOR_HOST}:{config.TOR_PORT}"
proxies = {"http": _p, "https": _p}
timeout = 30
else:
proxies = None
timeout = 10
r = r_get(url + path, timeout=timeout, proxies=proxies, json=data, headers=headers, verify=False)
r.raise_for_status()
return r
def determine_crypto(url):
data = {"method": "get_block_header_by_height", "params": {"height": 0}}
hashes = {
"monero": [
"418015bb9ae982a1975da7d79277c2705727a56894ba0fb246adaabb1f4632e3", #mainnet
"48ca7cd3c8de5b6a4d53d2861fbdaedca141553559f9be9520068053cda8430b", #testnet
"76ee3cc98646292206cd3e86f74d88b4dcc1d937088645e9b0cbca84b7ce74eb" #stagenet
],
"wownero": [
"a3fd635dd5cb55700317783469ba749b5259f0eeac2420ab2c27eb3ff5ffdc5c", #mainnet
"d81a24c7aad4628e5c9129f8f2ec85888885b28cf468597a9762c3945e9f29aa", #testnet
]
}
try:
r = make_request(url, "/json_rpc", data)
assert "result" in r.json()
hash = r.json()["result"]["block_header"]["hash"]
crypto = "unknown"
for c, h in hashes.items():
if hash in h:
crypto = c
break
return crypto
except:
return "unknown"
def is_onion(url: str):
_split = url.split(":")
if len(_split) < 2:
return False
if _split[1].endswith(".onion"):
return True
else:
return False
# Use hacky filesystem cache since i dont feel like shipping redis
def rw_cache(key_name, data=None):
pickle_file = path.join(config.DATA_DIR, f'{key_name}.pkl')
if data:
with open(pickle_file, 'wb') as f:
f.write(pickle.dumps(data))
return data
else:
with open(pickle_file, 'rb') as f:
pickled_data = pickle.load(f)
return pickled_data
def retrieve_peers(host, port):
try:
print(f'[.] Connecting to {host}:{port}')
sock = socket.socket()
sock.settimeout(5)
sock.connect((host, int(port)))
except:
sys.stderr.write("unable to connect to %s:%d\n" % (host, int([port])))
sys.exit()
bucket = Bucket.create_handshake_request()
sock.send(bucket.header())
sock.send(bucket.payload())
buckets = []
peers = []
while 1:
buffer = sock.recv(8)
if not buffer:
sys.stderr.write("Invalid response; exiting\n")
break
if not buffer.startswith(bytes(LEVIN_SIGNATURE)):
sys.stderr.write("Invalid response; exiting\n")
break
bucket = Bucket.from_buffer(signature=buffer, sock=sock)
buckets.append(bucket)
if bucket.command == 1001:
_peers = bucket.get_peers() or []
for peer in _peers:
try:
peers.append('http://%s:%d' % (peer['ip'].ip, peer['port'].value))
except:
pass
sock.close()
break
if peers:
return peers
else:
return None
def get_highest_block(nettype, crypto):
highest = Node.select().where(
Node.validated == True,
Node.nettype == nettype,
Node.crypto == crypto
).order_by(Node.last_height.desc()).limit(1).first()
return highest.last_height