performance improvements on get_peers script

main
lza_menace 5 months ago
parent 4dc78b3be8
commit 5bc00debc3

@ -88,78 +88,75 @@ def check_node(_node):
_hc.delete_instance() _hc.delete_instance()
node.delete_instance() node.delete_instance()
def upsert_peer(peer):
@bp.cli.command("get_peers") exists = Peer.select().where(Peer.url == peer).first()
def get_peers(): if exists:
""" exists.datetime = datetime.utcnow()
This command requests peers from the configured upstream node and fans out exists.save()
to recursively scrape all other peers on the network. This will take else:
several hours to run. with geoip2.database.Reader("./data/GeoLite2-City.mmdb") as geodb:
""" try:
# keep track of all peers u = urlparse(peer)
all_peers = [] _url = f"{u.scheme}://{u.netloc}".lower()
print("[+] Preparing to crawl Monero p2p network") geodata = geodb.city(u.hostname)
print(f"[.] Retrieving initial peers from {config.NODE_HOST}:{config.NODE_PORT}")
# start initial list of peers to scan
peers_to_scan = retrieve_peers(config.NODE_HOST, config.NODE_PORT)
print(f"[+] Found {len(peers_to_scan)} initial peers to begin scraping.")
sleep(3)
# helper function to add a new peer to the db or update an existing one
def save_peer(peer):
with geoip2.database.Reader("./data/GeoLite2-City.mmdb") as reader:
_url = urlparse(peer)
url = f"{_url.scheme}://{_url.netloc}".lower()
# add new peer if not in db
if not Peer.select().where(Peer.url == peer).exists():
response = reader.city(_url.hostname)
p = Peer( p = Peer(
url=peer, url=_url,
country=response.country.name, country=geodata.country.name,
city=response.city.name, city=geodata.city.name,
postal=response.postal.code, postal=geodata.postal.code,
lat=response.location.latitude, lat=geodata.location.latitude,
lon=response.location.longitude, lon=geodata.location.longitude,
) )
p.save() p.save()
print(f"{peer} - saving new peer") except Exception as e:
# or update if it does pass
def _get_peers():
"""
This command keeps will go through the oldest nodes and scan them for more peers.
Unresponsive peers get deleted. Responsive peers get their datestamp refreshed to move
to the top of the list. It will only crawl a subset of peers and is intended to be
run in intervals. The script will automatically prune out peers over time.
"""
# crawl existing peers
peers = Peer.select().order_by(Peer.datetime.asc()).limit(20)
for peer in peers:
try:
new_peers = retrieve_peers(peer.hostname, peer.port)
if new_peers:
new = []
for new_peer in new_peers:
exists = Peer.select().where(Peer.url == new_peer).first()
if not exists:
new.append(new_peer)
print(f"+++ Found {len(new)} more peers from {peer.url}")
upsert_peer(peer.url)
for new_peer in new_peers:
upsert_peer(new_peer)
else: else:
p = Peer.select().where(Peer.url == peer).first() raise Exception('dead node')
p.datetime = datetime.now() except Exception as e:
p.save() print(f"--- Dead peer {peer.url}")
return _url peer.delete_instance()
# iterate over the whole list until all peers have been scanned # if no peers are available in the database then get a list of peers to scan from upstream node
# add new peers to the list if not peers:
# skip the peer if we've seen it already print(f"[.] Retrieving peers from {config.NODE_HOST}:{config.NODE_PORT}")
try: peers_to_scan = retrieve_peers(config.NODE_HOST, config.NODE_PORT)
while peers_to_scan: print(f"[+] Found {len(peers_to_scan)} initial peers to begin scraping.")
_peer = peers_to_scan[0] for peer in peers_to_scan:
peers_to_scan.pop(0) upsert_peer(peer)
if _peer in all_peers:
print(f'already found {_peer}')
continue
all_peers.append(_peer)
try:
peer = save_peer(_peer)
peers_to_scan += retrieve_peers(peer.hostname, peer.port)
except:
pass
except KeyboardInterrupt:
print('Stopped.')
print( # rw_cache("map_peers", list(Peer.select().execute()))
f"[+] Found {len(all_peers)} peers from {config.NODE_HOST}:{config.NODE_PORT}"
)
print("[+] Deleting old Monero p2p peers")
for p in Peer.select():
if p.hours_elapsed() > config.PEER_LIFETIME:
print(f"[.] Deleting {p.url}")
p.delete_instance()
rw_cache("map_peers", list(Peer.select().execute()))
@bp.cli.command("get_peers")
def get_peers():
try:
_get_peers()
except KeyboardInterrupt:
print("Stopped")
except Exception as e:
print(f"Error: {e}")
@bp.cli.command("validate") @bp.cli.command("validate")
def validate(): def validate():

@ -13,7 +13,6 @@ TOR_PORT = environ.get("TOR_PORT", 9050)
NODE_HOST = environ.get("NODE_HOST", "singapore.node.xmr.pm") NODE_HOST = environ.get("NODE_HOST", "singapore.node.xmr.pm")
NODE_PORT = environ.get("NODE_PORT", 18080) NODE_PORT = environ.get("NODE_PORT", 18080)
HEALTHY_BLOCK_DIFF = int(environ.get("HEALTHY_BLOCK_DIFF", 500)) HEALTHY_BLOCK_DIFF = int(environ.get("HEALTHY_BLOCK_DIFF", 500))
PEER_LIFETIME = int(environ.get("PEER_LIFETIME", 96))
I2P_HOST = environ.get("I2P_HOST", "127.0.0.1") I2P_HOST = environ.get("I2P_HOST", "127.0.0.1")
I2P_PORT = environ.get("I2P_PORT", 4444) I2P_PORT = environ.get("I2P_PORT", 4444)
DONATE_ADDRESS = environ.get("DONATE_ADDRESS", "878ca636oEHcjZ3Zimuwx4AZTbeMtqY11eVihramcBgVciC254jpUm9AxbwAd57nxv1HRE9AGG1cXBkvmRzfsFXh1L6f2CU") DONATE_ADDRESS = environ.get("DONATE_ADDRESS", "878ca636oEHcjZ3Zimuwx4AZTbeMtqY11eVihramcBgVciC254jpUm9AxbwAd57nxv1HRE9AGG1cXBkvmRzfsFXh1L6f2CU")

@ -102,13 +102,11 @@ def rw_cache(key_name, data=None):
def retrieve_peers(host, port): def retrieve_peers(host, port):
try: try:
print(f"[.] Connecting to {host}:{port}")
sock = socket.socket() sock = socket.socket()
sock.settimeout(5) sock.settimeout(5)
sock.connect((host, int(port))) sock.connect((host, int(port)))
except: except Exception as e:
sys.stderr.write("unable to connect to %s:%d\n" % (host, int([port]))) return None
sys.exit()
bucket = Bucket.create_handshake_request() bucket = Bucket.create_handshake_request()
@ -121,11 +119,9 @@ def retrieve_peers(host, port):
while 1: while 1:
buffer = sock.recv(8) buffer = sock.recv(8)
if not buffer: if not buffer:
sys.stderr.write("Invalid response; exiting\n")
break break
if not buffer.startswith(bytes(LEVIN_SIGNATURE)): if not buffer.startswith(bytes(LEVIN_SIGNATURE)):
sys.stderr.write("Invalid response; exiting\n")
break break
bucket = Bucket.from_buffer(signature=buffer, sock=sock) bucket = Bucket.from_buffer(signature=buffer, sock=sock)

@ -61,6 +61,14 @@ class Peer(Model):
lon = FloatField(null=True) lon = FloatField(null=True)
datetime = DateTimeField(default=datetime.utcnow) datetime = DateTimeField(default=datetime.utcnow)
@property
def port(self):
return urlparse(self.url).port
@property
def hostname(self):
return urlparse(self.url).hostname
def hours_elapsed(self): def hours_elapsed(self):
now = datetime.utcnow() now = datetime.utcnow()
diff = now - self.datetime diff = now - self.datetime

Loading…
Cancel
Save