import time import json import random import qrcode import requests import base64 import re import os import typer from random_word import RandomWords SENDER_LENGTH = 4 PASSWORD_LENGTH = 8 import secrets from base64 import urlsafe_b64encode as b64e, urlsafe_b64decode as b64d from cryptography.fernet import Fernet from cryptography.hazmat.backends import default_backend from cryptography.hazmat.primitives import hashes from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC r = RandomWords() backend = default_backend() iterations = 100_000 def _derive_key(password: bytes, salt: bytes, iterations: int = iterations) -> bytes: """Derive a secret key from a given password and salt""" kdf = PBKDF2HMAC( algorithm=hashes.SHA256(), length=32, salt=salt, iterations=iterations, backend=backend) return b64e(kdf.derive(password)) def password_encrypt(message: bytes, password: str, iterations: int = iterations) -> bytes: salt = secrets.token_bytes(16) key = _derive_key(password.encode(), salt, iterations) return b64e( b'%b%b%b' % ( salt, iterations.to_bytes(4, 'big'), b64d(Fernet(key).encrypt(message)), ) ) def password_decrypt(token: bytes, password: str) -> bytes: decoded = b64d(token) salt, iter, token = decoded[:16], decoded[16:20], b64e(decoded[20:]) iterations = int.from_bytes(iter, 'big') key = _derive_key(password.encode(), salt, iterations) return Fernet(key).decrypt(token) class Bump: def __init__(self, secret: str = None) -> None: self._initialize_parameters() self._save_or_load_secrets(secret) def _save_or_load_secrets(self, secret: str = None) -> None: self._check_and_save_secret(secret) self.secrets = self._load_secrets(secret) if self.secrets == []: print("you seem to not have a secret in your secrets file! Creating one now...") self.generate_secret() def _check_secret_valid(self, secret: str) -> bool: pattern = re.compile('^[a-zA-Z-]+$') words = secret.split('-') if (pattern.match(secret) and len(words) >= SENDER_LENGTH + PASSWORD_LENGTH and sum([len(word) for word in words[SENDER_LENGTH:]]) > 32): return True else: return False def _initialize_parameters(self) -> None: self.secrets_file = os.path.join(os.path.expanduser('~'), '.config/bump/secrets_file') self.URL = "https://bump.maenle.net/api/" self.secrets = [] def _check_and_save_secret(self, secret:str = None) -> None: if secret is not None: if self._check_secret_valid(secret): self._save_secret(secret) else: print("--------------") print("invalid secret") print("--------------") def _save_secret(self, secret: str) -> None: self.secrets.append(secret) if not os.path.exists(os.path.dirname(self.secrets_file)): os.makedirs(os.path.dirname(self.secrets_file)) with open(self.secrets_file, 'a+') as f: f.write(secret + '\n') def _load_secrets(self, secret: str) -> list: try: with open(self.secrets_file, 'r') as f: secrets = f.read().splitlines() except FileNotFoundError: secrets = [] if secret is not None and secret not in self.secrets: secrets.append(secret) return secrets def generate_secret(self): keywords = self._generate_keywords() self._check_and_save_secret(keywords) def _generate_keywords(self) -> str: pattern = re.compile('^[a-zA-Z]+$') WORDS = r.get_random_words() secret = "" word_count = 0 while word_count < SENDER_LENGTH + PASSWORD_LENGTH: word = random.choice(WORDS) if pattern.match(word) and len(word) < 10 and len(word) >= 4: secret += word + "-" word_count += 1 secret = secret[:-1] return secret def show_secret(self) -> None: print("Scan this QR Code with the Bump app to connect") for secret in self.secrets: qr = qrcode.QRCode( version=1, error_correction=qrcode.constants.ERROR_CORRECT_L, box_size=10, border=4, ) qr.add_data(secret) qr.print_ascii() print("") print(secret) def _get_password(self, index: int = 0) -> str: secret = self.secrets[index].split('-') return "-".join(secret[SENDER_LENGTH:]) def _get_sender(self, index: int = 0) -> str: words = self.secrets[index].split('-') return "-".join(words[0:SENDER_LENGTH]) def _encrypt(self, data:str) -> str: password = self._get_password() return password_encrypt(data.encode(), password) def _decrypt(self, data: str) -> str: password = self._get_password() return password_decrypt(data, password).decode() def push(self, title='', data='') -> None: params = { 'sender': self._get_sender(), 'title': title, 'data': self._encrypt(data) } return self._set_post("push", params) def peek(self) -> dict: return self._get_post("peek") def pop(self) -> dict: return self._get_post("pop") def list(self) -> list: params = { 'minutes': 2 } return self._get_post("list", params) def clear(self) -> None: return self._get_post("clear") def delete_senders(self): self.secrets = [] if not os.path.exists(os.path.dirname(self.secrets_file)): os.makedirs(os.path.dirname(self.secrets_file)) with open(self.secrets_file, 'w+') as f: f.write("") def delete_sender(self): return self._set_post("delete_sender") def _set_post(self, mechanism, add_params = None): url = self.URL + mechanism params = { 'sender': self._get_sender(), } if add_params != None: params.update(add_params) messages = requests.post(url, params).json() print(messages) def _get_post(self, mechanism: str, add_params: dict = None) -> dict: url = self.URL + mechanism params = { 'sender': self._get_sender() } if add_params != None: params.update(add_params) messages = requests.post(url, params).json() if(messages == {} or messages == None or "messages" in messages and messages["messages"] == []): return [{}] elif "messages" in messages: for message in messages.get("messages"): message['data'] = self._decrypt(message.get('data')) return messages else: messages['data'] = self._decrypt(messages.get('data')) return [messages] def _load_log(self) -> list: with open('.bump_log', "r+") as f: return f.readlines() def _save_log(self, message: str) -> None: with open('.bump_log', "a") as f: f.write(self._to_log_line(message)) def _to_log_line(self, message: str) -> str: return json.dumps(message) + "\n" def alert(self, sleep_time: int = 1) -> None: log = self._load_log() while True: time.sleep(sleep_time) messages = self.list() for message in messages['messages']: if self._to_log_line(message) not in log and message != {}: print({'title': message['title'], 'data': message['data']}) self._save_log(message) log = self._load_log()