diff --git a/xmrnodes/cli.py b/xmrnodes/cli.py index 29c9681..04991ad 100644 --- a/xmrnodes/cli.py +++ b/xmrnodes/cli.py @@ -88,78 +88,75 @@ def check_node(_node): _hc.delete_instance() node.delete_instance() - -@bp.cli.command("get_peers") -def get_peers(): - """ - This command requests peers from the configured upstream node and fans out - to recursively scrape all other peers on the network. This will take - several hours to run. - """ - # keep track of all peers - all_peers = [] - print("[+] Preparing to crawl Monero p2p network") - print(f"[.] Retrieving initial peers from {config.NODE_HOST}:{config.NODE_PORT}") - - # start initial list of peers to scan - peers_to_scan = retrieve_peers(config.NODE_HOST, config.NODE_PORT) - print(f"[+] Found {len(peers_to_scan)} initial peers to begin scraping.") - sleep(3) - - # helper function to add a new peer to the db or update an existing one - def save_peer(peer): - with geoip2.database.Reader("./data/GeoLite2-City.mmdb") as reader: - _url = urlparse(peer) - url = f"{_url.scheme}://{_url.netloc}".lower() - # add new peer if not in db - if not Peer.select().where(Peer.url == peer).exists(): - response = reader.city(_url.hostname) +def upsert_peer(peer): + exists = Peer.select().where(Peer.url == peer).first() + if exists: + exists.datetime = datetime.utcnow() + exists.save() + else: + with geoip2.database.Reader("./data/GeoLite2-City.mmdb") as geodb: + try: + u = urlparse(peer) + _url = f"{u.scheme}://{u.netloc}".lower() + geodata = geodb.city(u.hostname) p = Peer( - url=peer, - country=response.country.name, - city=response.city.name, - postal=response.postal.code, - lat=response.location.latitude, - lon=response.location.longitude, + url=_url, + country=geodata.country.name, + city=geodata.city.name, + postal=geodata.postal.code, + lat=geodata.location.latitude, + lon=geodata.location.longitude, ) p.save() - print(f"{peer} - saving new peer") - # or update if it does + except Exception as e: + pass + +def _get_peers(): + """ + This command keeps will go through the oldest nodes and scan them for more peers. + Unresponsive peers get deleted. Responsive peers get their datestamp refreshed to move + to the top of the list. It will only crawl a subset of peers and is intended to be + run in intervals. The script will automatically prune out peers over time. + """ + # crawl existing peers + peers = Peer.select().order_by(Peer.datetime.asc()).limit(20) + for peer in peers: + try: + new_peers = retrieve_peers(peer.hostname, peer.port) + if new_peers: + new = [] + for new_peer in new_peers: + exists = Peer.select().where(Peer.url == new_peer).first() + if not exists: + new.append(new_peer) + print(f"+++ Found {len(new)} more peers from {peer.url}") + upsert_peer(peer.url) + for new_peer in new_peers: + upsert_peer(new_peer) else: - p = Peer.select().where(Peer.url == peer).first() - p.datetime = datetime.now() - p.save() - return _url + raise Exception('dead node') + except Exception as e: + print(f"--- Dead peer {peer.url}") + peer.delete_instance() + + # if no peers are available in the database then get a list of peers to scan from upstream node + if not peers: + print(f"[.] Retrieving peers from {config.NODE_HOST}:{config.NODE_PORT}") + peers_to_scan = retrieve_peers(config.NODE_HOST, config.NODE_PORT) + print(f"[+] Found {len(peers_to_scan)} initial peers to begin scraping.") + for peer in peers_to_scan: + upsert_peer(peer) - # iterate over the whole list until all peers have been scanned - # add new peers to the list - # skip the peer if we've seen it already + # rw_cache("map_peers", list(Peer.select().execute())) + +@bp.cli.command("get_peers") +def get_peers(): try: - while peers_to_scan: - _peer = peers_to_scan[0] - peers_to_scan.pop(0) - if _peer in all_peers: - print(f'already found {_peer}') - continue - all_peers.append(_peer) - try: - peer = save_peer(_peer) - peers_to_scan += retrieve_peers(peer.hostname, peer.port) - except: - pass + _get_peers() except KeyboardInterrupt: - print('Stopped.') - - print( - f"[+] Found {len(all_peers)} peers from {config.NODE_HOST}:{config.NODE_PORT}" - ) - print("[+] Deleting old Monero p2p peers") - for p in Peer.select(): - if p.hours_elapsed() > config.PEER_LIFETIME: - print(f"[.] Deleting {p.url}") - p.delete_instance() - rw_cache("map_peers", list(Peer.select().execute())) - + print("Stopped") + except Exception as e: + print(f"Error: {e}") @bp.cli.command("validate") def validate(): diff --git a/xmrnodes/config.py b/xmrnodes/config.py index 2b8cb62..d9ff566 100644 --- a/xmrnodes/config.py +++ b/xmrnodes/config.py @@ -13,7 +13,6 @@ TOR_PORT = environ.get("TOR_PORT", 9050) NODE_HOST = environ.get("NODE_HOST", "singapore.node.xmr.pm") NODE_PORT = environ.get("NODE_PORT", 18080) HEALTHY_BLOCK_DIFF = int(environ.get("HEALTHY_BLOCK_DIFF", 500)) -PEER_LIFETIME = int(environ.get("PEER_LIFETIME", 96)) I2P_HOST = environ.get("I2P_HOST", "127.0.0.1") I2P_PORT = environ.get("I2P_PORT", 4444) DONATE_ADDRESS = environ.get("DONATE_ADDRESS", "878ca636oEHcjZ3Zimuwx4AZTbeMtqY11eVihramcBgVciC254jpUm9AxbwAd57nxv1HRE9AGG1cXBkvmRzfsFXh1L6f2CU") \ No newline at end of file diff --git a/xmrnodes/helpers.py b/xmrnodes/helpers.py index 10fc6d4..53a43d4 100644 --- a/xmrnodes/helpers.py +++ b/xmrnodes/helpers.py @@ -102,13 +102,11 @@ def rw_cache(key_name, data=None): def retrieve_peers(host, port): try: - print(f"[.] Connecting to {host}:{port}") sock = socket.socket() sock.settimeout(5) sock.connect((host, int(port))) - except: - sys.stderr.write("unable to connect to %s:%d\n" % (host, int([port]))) - sys.exit() + except Exception as e: + return None bucket = Bucket.create_handshake_request() @@ -121,11 +119,9 @@ def retrieve_peers(host, port): while 1: buffer = sock.recv(8) if not buffer: - sys.stderr.write("Invalid response; exiting\n") break if not buffer.startswith(bytes(LEVIN_SIGNATURE)): - sys.stderr.write("Invalid response; exiting\n") break bucket = Bucket.from_buffer(signature=buffer, sock=sock) diff --git a/xmrnodes/models.py b/xmrnodes/models.py index 672e09d..975036b 100644 --- a/xmrnodes/models.py +++ b/xmrnodes/models.py @@ -61,6 +61,14 @@ class Peer(Model): lon = FloatField(null=True) datetime = DateTimeField(default=datetime.utcnow) + @property + def port(self): + return urlparse(self.url).port + + @property + def hostname(self): + return urlparse(self.url).hostname + def hours_elapsed(self): now = datetime.utcnow() diff = now - self.datetime