adds pip setup script, updates gitignore
- pip setup.py is provided to allow installs via pip this includes a brief package description and install dependencies - bump is now moved to a subfolder bump with __init__ as this is a package setup requirement - gitignore now ignores any tar'ed files and any pycaches
This commit is contained in:
2
bump/__init__.py
Normal file
2
bump/__init__.py
Normal file
@ -0,0 +1,2 @@
|
||||
from bump.bump import Bump
|
||||
|
219
bump/bump.py
Normal file
219
bump/bump.py
Normal file
@ -0,0 +1,219 @@
|
||||
import time
|
||||
import ipdb
|
||||
import json
|
||||
import random
|
||||
import qrcode
|
||||
import requests
|
||||
import base64
|
||||
import re
|
||||
import os
|
||||
|
||||
|
||||
SENDER_LENGTH = 4
|
||||
PASSWORD_LENGTH = 8
|
||||
|
||||
from random_word import RandomWords
|
||||
|
||||
import secrets
|
||||
from base64 import urlsafe_b64encode as b64e, urlsafe_b64decode as b64d
|
||||
|
||||
from cryptography.fernet import Fernet
|
||||
from cryptography.hazmat.backends import default_backend
|
||||
from cryptography.hazmat.primitives import hashes
|
||||
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
|
||||
|
||||
r = RandomWords()
|
||||
|
||||
backend = default_backend()
|
||||
iterations = 100_000
|
||||
|
||||
def _derive_key(password: bytes, salt: bytes, iterations: int = iterations) -> bytes:
|
||||
"""Derive a secret key from a given password and salt"""
|
||||
kdf = PBKDF2HMAC(
|
||||
algorithm=hashes.SHA256(), length=32, salt=salt,
|
||||
iterations=iterations, backend=backend)
|
||||
return b64e(kdf.derive(password))
|
||||
|
||||
def password_encrypt(message: bytes, password: str, iterations: int = iterations) -> bytes:
|
||||
salt = secrets.token_bytes(16)
|
||||
key = _derive_key(password.encode(), salt, iterations)
|
||||
return b64e(
|
||||
b'%b%b%b' % (
|
||||
salt,
|
||||
iterations.to_bytes(4, 'big'),
|
||||
b64d(Fernet(key).encrypt(message)),
|
||||
)
|
||||
)
|
||||
|
||||
def password_decrypt(token: bytes, password: str) -> bytes:
|
||||
decoded = b64d(token)
|
||||
salt, iter, token = decoded[:16], decoded[16:20], b64e(decoded[20:])
|
||||
iterations = int.from_bytes(iter, 'big')
|
||||
key = _derive_key(password.encode(), salt, iterations)
|
||||
return Fernet(key).decrypt(token)
|
||||
|
||||
class Bump:
|
||||
def __init__(self, secret=None, secrets_file=None):
|
||||
if None == secrets_file:
|
||||
secrets_file = os.path.join(os.path.expanduser('~'), '.config/bump/secrets_file')
|
||||
self.secrets = self._load_secrets(secret, secrets_file)
|
||||
if self.secrets == []:
|
||||
print("you seem to not have a secret in your secrets file! Creating one now...")
|
||||
self.generate_secret(secrets_file)
|
||||
self.show_secret()
|
||||
|
||||
self.URL = "https://bump.maenle.net/api/"
|
||||
|
||||
|
||||
|
||||
def _load_secrets(self, secret, secrets_file):
|
||||
try:
|
||||
with open(secrets_file, 'r') as f:
|
||||
secrets = f.read().splitlines()
|
||||
except FileNotFoundError:
|
||||
secrets = []
|
||||
if secret is not None and secret not in self.secrets:
|
||||
secrets.append(secret)
|
||||
|
||||
return secrets
|
||||
|
||||
def generate_secret(self, secrets_file):
|
||||
WORDS = r.get_random_words()
|
||||
secret = ""
|
||||
for _ in range(SENDER_LENGTH + PASSWORD_LENGTH):
|
||||
secret += random.choice(WORDS) + "-"
|
||||
while len(secret[SENDER_LENGTH:]) < 32:
|
||||
secret += random.choice(WORDS) + "-"
|
||||
secret = secret[:-1]
|
||||
|
||||
self.secrets.append(secret)
|
||||
|
||||
ipdb.set_trace()
|
||||
if not os.path.exists(os.path.dirname(secrets_file)):
|
||||
os.makedirs(os.path.dirname(secrets_file))
|
||||
|
||||
with open(secrets_file, 'a+') as f:
|
||||
f.write(secret + '\n')
|
||||
|
||||
|
||||
def show_secret(self):
|
||||
print("Scan this QR Code with the Bump app to connect them:")
|
||||
for secret in self.secrets:
|
||||
qr = qrcode.QRCode(
|
||||
version=1,
|
||||
error_correction=qrcode.constants.ERROR_CORRECT_L,
|
||||
box_size=10,
|
||||
border=4,
|
||||
)
|
||||
qr.add_data(secret)
|
||||
qr.print_ascii()
|
||||
print("")
|
||||
print(secret)
|
||||
|
||||
def _get_password(self, index=0):
|
||||
secret = self.secrets[index].split('-')
|
||||
return "-".join(secret[SENDER_LENGTH:])
|
||||
|
||||
|
||||
def _get_sender(self, index=0):
|
||||
words = self.secrets[index].split('-')
|
||||
return "-".join(words[0:SENDER_LENGTH])
|
||||
|
||||
def _encrypt(self, data):
|
||||
password = self._get_password()
|
||||
return password_encrypt(data.encode(), password)
|
||||
|
||||
def _decrypt(self, data):
|
||||
password = self._get_password()
|
||||
return password_decrypt(data, password).decode()
|
||||
|
||||
def push(self, title='', data=''):
|
||||
params = {
|
||||
'sender': self._get_sender(),
|
||||
'title': title,
|
||||
'data': self._encrypt(data)
|
||||
}
|
||||
return self._set_post("push", params)
|
||||
|
||||
def peek(self):
|
||||
return self._get_post("peek")
|
||||
|
||||
def pop(self):
|
||||
return self._get_post("pop")
|
||||
|
||||
def list(self):
|
||||
params = {
|
||||
'minutes': 2
|
||||
}
|
||||
return self._get_post("list", params)
|
||||
|
||||
|
||||
def clear(self):
|
||||
return self._get_post("clear")
|
||||
|
||||
def delete_sender(self):
|
||||
return self._set_post("delete_sender")
|
||||
|
||||
def _set_post(self, mechanism, add_params = None):
|
||||
url = self.URL + mechanism
|
||||
params = {
|
||||
'sender': self._get_sender(),
|
||||
}
|
||||
|
||||
if add_params != None:
|
||||
params.update(add_params)
|
||||
|
||||
messages = requests.post(url, params).json()
|
||||
print(messages)
|
||||
|
||||
def _get_post(self, mechanism, add_params = None):
|
||||
url = self.URL + mechanism
|
||||
|
||||
params = {
|
||||
'sender': self._get_sender()
|
||||
}
|
||||
|
||||
if add_params != None:
|
||||
params.update(add_params)
|
||||
|
||||
messages = requests.post(url, params).json()
|
||||
|
||||
if(messages == {} or messages == None or
|
||||
"messages" in messages and messages["messages"] == []):
|
||||
return [{}]
|
||||
elif "messages" in messages:
|
||||
for message in messages.get("messages"):
|
||||
message['data'] = self._decrypt(message.get('data'))
|
||||
|
||||
return messages
|
||||
else:
|
||||
messages['data'] = self._decrypt(messages.get('data'))
|
||||
return [messages]
|
||||
|
||||
|
||||
def _load_log(self):
|
||||
with open('.bump_log', "r+") as f:
|
||||
return f.readlines()
|
||||
|
||||
def _save_log(self, message):
|
||||
with open('.bump_log', "a") as f:
|
||||
f.write(self._to_log_line(message))
|
||||
|
||||
def _to_log_line(self, message):
|
||||
return json.dumps(message) + "\n"
|
||||
|
||||
def alert(self, sleep_time=1):
|
||||
log = self._load_log()
|
||||
while True:
|
||||
time.sleep(sleep_time)
|
||||
messages = self.list()
|
||||
for message in messages:
|
||||
if self._to_log_line(message) not in log and message != {}:
|
||||
print(message['data'])
|
||||
self._save_log(message)
|
||||
log = self.load_log()
|
||||
|
||||
|
||||
|
||||
if __name__ == '__main__':
|
||||
b = Bump()
|
Reference in New Issue
Block a user