bump_python/bump/bump.py
raphael bfd30324e1 senders can now be added and removed via cli
- --reset flag (default false) can now be set
  to remove all senders
- multiple senders can be added now
2022-01-17 11:44:43 +01:00

258 lines
7.7 KiB
Python

import time
import json
import random
import qrcode
import requests
import base64
import re
import os
import typer
from random_word import RandomWords
SENDER_LENGTH = 4
PASSWORD_LENGTH = 8
import secrets
from base64 import urlsafe_b64encode as b64e, urlsafe_b64decode as b64d
from cryptography.fernet import Fernet
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
r = RandomWords()
backend = default_backend()
iterations = 100_000
def _derive_key(password: bytes, salt: bytes, iterations: int = iterations) -> bytes:
"""Derive a secret key from a given password and salt"""
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(), length=32, salt=salt,
iterations=iterations, backend=backend)
return b64e(kdf.derive(password))
def password_encrypt(message: bytes, password: str, iterations: int = iterations) -> bytes:
salt = secrets.token_bytes(16)
key = _derive_key(password.encode(), salt, iterations)
return b64e(
b'%b%b%b' % (
salt,
iterations.to_bytes(4, 'big'),
b64d(Fernet(key).encrypt(message)),
)
)
def password_decrypt(token: bytes, password: str) -> bytes:
decoded = b64d(token)
salt, iter, token = decoded[:16], decoded[16:20], b64e(decoded[20:])
iterations = int.from_bytes(iter, 'big')
key = _derive_key(password.encode(), salt, iterations)
return Fernet(key).decrypt(token)
class Bump:
def __init__(self, secret: str = None) -> None:
self._initialize_parameters()
self._save_or_load_secrets(secret)
def _save_or_load_secrets(self, secret: str = None) -> None:
self._check_and_save_secret(secret)
self.secrets = self._load_secrets(secret)
if self.secrets == []:
print("you seem to not have a secret in your secrets file! Creating one now...")
self.generate_secret()
def _check_secret_valid(self, secret: str) -> bool:
pattern = re.compile('^[a-zA-Z-]+$')
words = secret.split('-')
if (pattern.match(secret) and
len(words) >= SENDER_LENGTH + PASSWORD_LENGTH and
sum([len(word) for word in words[SENDER_LENGTH:]]) > 32):
return True
else:
return False
def _initialize_parameters(self) -> None:
self.secrets_file = os.path.join(os.path.expanduser('~'), '.config/bump/secrets_file')
self.URL = "https://bump.maenle.net/api/"
self.secrets = []
def _check_and_save_secret(self, secret:str = None) -> None:
if secret is not None:
if self._check_secret_valid(secret):
self._save_secret(secret)
else:
print("--------------")
print("invalid secret")
print("--------------")
def _save_secret(self, secret: str) -> None:
self.secrets.append(secret)
if not os.path.exists(os.path.dirname(self.secrets_file)):
os.makedirs(os.path.dirname(self.secrets_file))
with open(self.secrets_file, 'a+') as f:
f.write(secret + '\n')
def _load_secrets(self, secret: str) -> list:
try:
with open(self.secrets_file, 'r') as f:
secrets = f.read().splitlines()
except FileNotFoundError:
secrets = []
if secret is not None and secret not in self.secrets:
secrets.append(secret)
return secrets
def generate_secret(self):
keywords = self._generate_keywords()
self._check_and_save_secret(keywords)
def _generate_keywords(self) -> str:
pattern = re.compile('^[a-zA-Z]+$')
WORDS = r.get_random_words()
secret = ""
word_count = 0
while word_count < SENDER_LENGTH + PASSWORD_LENGTH:
word = random.choice(WORDS)
if pattern.match(word) and len(word) < 10 and len(word) >= 4:
secret += word + "-"
word_count += 1
secret = secret[:-1]
return secret
def show_secret(self) -> None:
print("Scan this QR Code with the Bump app to connect")
for secret in self.secrets:
qr = qrcode.QRCode(
version=1,
error_correction=qrcode.constants.ERROR_CORRECT_L,
box_size=10,
border=4,
)
qr.add_data(secret)
qr.print_ascii()
print("")
print(secret)
def _get_password(self, index: int = 0) -> str:
secret = self.secrets[index].split('-')
return "-".join(secret[SENDER_LENGTH:])
def _get_sender(self, index: int = 0) -> str:
words = self.secrets[index].split('-')
return "-".join(words[0:SENDER_LENGTH])
def _encrypt(self, data:str) -> str:
password = self._get_password()
return password_encrypt(data.encode(), password)
def _decrypt(self, data: str) -> str:
password = self._get_password()
return password_decrypt(data, password).decode()
def push(self, title='', data='') -> None:
params = {
'sender': self._get_sender(),
'title': title,
'data': self._encrypt(data)
}
return self._set_post("push", params)
def peek(self) -> dict:
return self._get_post("peek")
def pop(self) -> dict:
return self._get_post("pop")
def list(self) -> list:
params = {
'minutes': 2
}
return self._get_post("list", params)
def clear(self) -> None:
return self._get_post("clear")
def delete_senders(self):
self.secrets = []
if not os.path.exists(os.path.dirname(self.secrets_file)):
os.makedirs(os.path.dirname(self.secrets_file))
with open(self.secrets_file, 'w+') as f:
f.write("")
def delete_sender(self):
return self._set_post("delete_sender")
def _set_post(self, mechanism, add_params = None):
url = self.URL + mechanism
params = {
'sender': self._get_sender(),
}
if add_params != None:
params.update(add_params)
messages = requests.post(url, params).json()
print(messages)
def _get_post(self, mechanism: str, add_params: dict = None) -> dict:
url = self.URL + mechanism
params = {
'sender': self._get_sender()
}
if add_params != None:
params.update(add_params)
messages = requests.post(url, params).json()
if(messages == {} or messages == None or
"messages" in messages and messages["messages"] == []):
return [{}]
elif "messages" in messages:
for message in messages.get("messages"):
message['data'] = self._decrypt(message.get('data'))
return messages
else:
messages['data'] = self._decrypt(messages.get('data'))
return [messages]
def _load_log(self) -> list:
with open('.bump_log', "r+") as f:
return f.readlines()
def _save_log(self, message: str) -> None:
with open('.bump_log', "a") as f:
f.write(self._to_log_line(message))
def _to_log_line(self, message: str) -> str:
return json.dumps(message) + "\n"
def alert(self, sleep_time: int = 1) -> None:
log = self._load_log()
while True:
time.sleep(sleep_time)
messages = self.list()
for message in messages:
if self._to_log_line(message) not in log and message != {}:
print(message['data'])
self._save_log(message)
log = self.load_log()