You cannot select more than 25 topics Topics must start with a letter or number, can include dashes ('-') and can be up to 35 characters long.

210 lines
6.6 KiB
Python

import requests
from lws.models import User
from lws import config
# accept_requests: {"type": "import"|"create", "addresses":[...]}
# add_account: {"address": ..., "key": ...}
# list_accounts: {}
# list_requests: {}
# modify_account_status: {"status": "active"|"hidden"|"inactive", "addresses":[...]}
# reject_requests: {"type": "import"|"create", "addresses":[...]}
# rescan: {"height":..., "addresses":[...]}
# webhook_add: {"type":"tx-confirmation", "address":"...", "url":"...", ...} with optional fields:
# token: A string to be returned when the webhook is triggered
# payment_id: 16 hex characters representing a unique identifier for a transaction
# webhook_delete: {"addresses":[...]}
# webhook_delete_uuid: {"event_ids": [...]}
# webhook_list: {}
def get_height() -> int:
try:
r = requests.get(f"{config.MONEROD_URL}/get_info")
print(r.content)
return int(r.json()["height"])
except Exception as e:
print(e)
return 0
class LWS:
def __init__(self):
pass
def init(self, admin_key):
self.admin_key = admin_key
def _init(self):
self.admin_key = User.select().first().view_key
def get_address_info(self, address, view_key):
endpoint = f"{config.LWS_URL}/get_address_info"
data = {
"address": address,
"view_key": view_key
}
r = requests.post(endpoint, json=data, timeout=5)
r.raise_for_status()
return r.json()
def get_wallet(self, address: str) -> dict:
try:
res = self.list_accounts()
for _status in res:
for _wallet in res[_status]:
if _wallet["address"] == address:
_wallet["status"] = _status
return _wallet
return {}
except Exception as e:
print(f"Failed to check wallet active: {e}")
return {}
def exists(self, address: str) -> bool:
try:
res = self.get_wallet(address)
return False if res == {} else True
except Exception as e:
print(f"Failed to check wallet active: {e}")
return False
def list_accounts(self) -> dict:
endpoint = f"{config.LWS_ADMIN_URL}/list_accounts"
data = {"auth": self.admin_key}
try:
req = requests.post(endpoint, json=data, timeout=5)
req.raise_for_status()
if req.ok:
return req.json()
return {}
except Exception as e:
print(f"Failed to list accounts: {e}")
return {}
2 years ago
def list_requests(self) -> dict:
endpoint = f"{config.LWS_ADMIN_URL}/list_requests"
data = {"auth": self.admin_key}
try:
req = requests.post(endpoint, json=data, timeout=5)
req.raise_for_status()
if req.ok:
return req.json()
return {}
except Exception as e:
print(f"Failed to list accounts: {e}")
return {}
def get_address_txs(self, address: str, view_key: str) -> dict:
2 years ago
endpoint = f"{config.LWS_URL}/get_address_txs"
data = {
"address": address,
"view_key": view_key
}
try:
req = requests.post(endpoint, json=data, timeout=5)
req.raise_for_status()
if req.ok:
return req.json()
return {}
except Exception as e:
print(f"Failed to get wallet info {address}: {e}")
return {}
def add_wallet(self, address: str, view_key: str) -> dict:
endpoint = f"{config.LWS_ADMIN_URL}/add_account"
data = {
"auth": self.admin_key,
"params": {
"address": address,
"key": view_key
}
}
try:
req = requests.post(endpoint, json=data, timeout=5)
req.raise_for_status()
if req.ok:
return req.json()
return {}
except Exception as e:
print(f"Failed to add wallet {address}: {e}")
return {}
def modify_wallet(self, address: str, status: str) -> dict:
endpoint = f"{config.LWS_ADMIN_URL}/modify_account_status"
data = {
"auth": self.admin_key,
"params": {
"addresses": [address],
"status": status
}
}
try:
req = requests.post(endpoint, json=data, timeout=5)
req.raise_for_status()
if req.ok:
return req.json()
return {}
except Exception as e:
print(f"Failed to modify wallet {address}: {e}")
return {}
def accept_request(self, address: str, req_type: str="create") -> dict:
endpoint = f"{config.LWS_ADMIN_URL}/accept_requests"
data = {
"auth": self.admin_key,
"params": {
"addresses": [address],
"type": req_type
}
}
try:
req = requests.post(endpoint, json=data, timeout=5)
req.raise_for_status()
if req.ok:
return req.json()
return {}
except Exception as e:
print(f"Failed to accept request wallet {address}: {e}")
return {}
def reject_request(self, address: str, req_type: str="create") -> dict:
endpoint = f"{config.LWS_ADMIN_URL}/reject_requests"
data = {
"auth": self.admin_key,
"params": {
"addresses": [address],
"type": req_type
}
}
try:
req = requests.post(endpoint, json=data, timeout=5)
req.raise_for_status()
if req.ok:
return req.json()
return {}
except Exception as e:
print(f"Failed to reject request wallet {address}: {e}")
return {}
def rescan(self, address: str, height: int) -> dict:
endpoint = f"{config.LWS_ADMIN_URL}/rescan"
data = {
"auth": self.admin_key,
"params": {
"addresses": [address],
"height": height
}
}
try:
req = requests.post(endpoint, json=data, timeout=5)
req.raise_for_status()
if req.ok:
return req.json()
return {}
except Exception as e:
print(f"Failed to rescan wallet {address}: {e}")
return {}
lws = LWS()