common/peers.py
from multiprocessing import Queue
import multiprocessing, os, pickle, select, socket, sys, time, rsa, traceback
from common.safeprint import safeprint
from common.bounty import *
global ext_port
global ext_ip
global port
global myPriv
global myPub
global propQueue
ext_port = -1
ext_ip = ""
port = 44565
myPub, myPriv = rsa.newkeys(1024)
propQueue = multiprocessing.Queue()
seedlist = [("127.0.0.1", 44565), ("localhost", 44565),
("10.132.80.128", 44565)]
peerlist = [("24.10.111.111", 44565)]
remove = []
bounties = []
# constants
peers_file = "data" + os.sep + "peerlist.pickle"
key_request = "Key Request".encode('utf-8')
close_signal = "Close Signal".encode("utf-8")
peer_request = "Requesting Peers".encode("utf-8")
bounty_request = "Requesting Bounties".encode("utf-8")
incoming_bounties = "Incoming Bounties".encode("utf-8")
incoming_bounty = "Incoming Bounty".encode("utf-8")
valid_signal = "Bounty was valid".encode("utf-8")
invalid_signal = "Bounty was invalid".encode("utf-8")
end_of_message = "End of message".encode("utf-8")
sig_length = len(max(
close_signal, peer_request, bounty_request, incoming_bounties,
incoming_bounty, valid_signal, invalid_signal, key=len))
def pad(string):
return string + " ".encode('utf-8') * (sig_length - (((len(string) - 1) % sig_length) + 1))
close_signal = pad(close_signal)
peer_request = pad(peer_request)
bounty_request = pad(bounty_request)
incoming_bounties = pad(incoming_bounties)
incoming_bounty = pad(incoming_bounty)
valid_signal = pad(valid_signal)
invalid_signal = pad(invalid_signal)
end_of_message = pad(end_of_message)
signals = [close_signal, peer_request, bounty_request, incoming_bounty, valid_signal, invalid_signal]
def send(msg, conn, key):
while key is None:
safeprint("Key not found. Requesting key")
conn.send(key_request)
try:
key = pickle.loads(conn.recv(1024))
key = rsa.PublicKey(key[0], key[1])
safeprint("Key received")
except EOFError:
continue
if not isinstance(msg, type("a".encode('utf-8'))):
msg = msg.encode('utf-8')
x = 0
while x < len(msg) - 117:
conn.sendall(rsa.encrypt(msg[x:x+117], key))
x += 117
conn.sendall(rsa.encrypt(msg[x:], key))
conn.sendall(rsa.encrypt(end_of_message, key))
return key
def recv(conn):
received = "".encode('utf-8')
a = ""
try:
while True:
a = conn.recv(128)
if a == key_request:
safeprint("Key requested. Sending key")
conn.sendall(pickle.dumps((myPriv.n, myPriv.e), 0))
continue
a = rsa.decrypt(a, myPriv)
safeprint("Packet = " + str(a), verbosity=3)
if a == end_of_message:
return received
received += a
except rsa.pkcs1.DecryptionError as error:
safeprint("Decryption error---Content: " + str(a))
return "".encode('utf-8')
def get_lan_ip():
"""Retrieves the LAN ip. Expanded from http://stackoverflow.com/a/28950776"""
import socket
s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
try:
# doesn't even have to be reachable
s.connect(('8.8.8.8', 23))
IP = s.getsockname()[0]
except:
IP = '127.0.0.1'
finally:
s.close()
return IP
def getFromFile():
"""Load peerlist from a file"""
if os.path.exists(peers_file):
try:
peerlist.extend(pickle.load(open(peers_file, "rb")))
trimPeers()
except:
safeprint("Could not load peerlist from file")
def saveToFile():
"""Save peerlist to a file"""
if not os.path.exists(peers_file.split(os.sep)[0]):
os.mkdir(peers_file.split(os.sep)[0])
pickle.dump(peerlist[:], open(peers_file, "wb"), 0)
def getFromSeeds():
"""Make peer requests to each address on the seedlist"""
for seed in seedlist:
safeprint(seed, verbosity=1)
peerlist.extend(requestPeerlist(seed))
time.sleep(1)
def requestPeerlist(address):
"""Request the peerlist of another node. Currently has additional test commands"""
conn = socket.socket()
conn.settimeout(5)
safeprint(address, verbosity=1)
try:
conn.connect(address)
key = send(peer_request, conn, None)
received = recv(conn)
safeprint(pickle.loads(received), verbosity=2)
if recv(conn) == peer_request:
handlePeerRequest(conn, False, key=key, received=pickle.loads(received))
recv(conn)
conn.close()
return pickle.loads(received)
except Exception as error:
safeprint("Failed:" + str(type(error)))
safeprint(error)
remove.extend([address])
return []
def requestBounties(address):
"""Request the bountylist of another node"""
conn = socket.socket()
conn.settimeout(5)
safeprint(address, verbosity=1)
try:
conn.connect(address)
key = send(bounty_request, conn, None)
received = recv(conn)
if recv(conn) == bounty_request:
handleBountyRequest(conn, False, key=key, received=pickle.loads(received))
safeprint(recv(conn))
conn.close()
addBounties(pickle.loads(received))
except Exception as error:
safeprint("Failed:" + str(type(error)))
safeprint(error)
remove.extend([address])
def initializePeerConnections(newPort, newip, newport):
"""Populate the peer list from a previous session, seeds, and from the peer list if its size is less than 12. Then save this new list to a file"""
port = newPort # Does this affect the global variable?
ext_ip = newip # Does this affect the global variable?
ext_port = newport # Does this affect the global variable?
safeprint([ext_ip, ext_port])
getFromFile()
safeprint("peers fetched from file", verbosity=1)
getFromSeeds()
safeprint("peers fetched from seedlist", verbosity=1)
trimPeers()
if len(peerlist) < 12:
safeprint(len(peerlist))
newlist = []
for peer in peerlist:
newlist.extend(requestPeerlist(peer))
peerlist.extend(newlist)
trimPeers()
safeprint("getting bounties from peers and seeds", verbosity=1)
for peer in peerlist[:] + seedlist[:]:
requestBounties(peer)
safeprint("peer network extended", verbosity=1)
saveToFile()
safeprint("peer network saved to file", verbosity=1)
safeprint(peerlist)
safeprint([ext_ip, ext_port])
def trimPeers():
"""Trim the peerlist to a single set, and remove any that were marked as erroneous before"""
temp = list(set(peerlist[:]))
for peer in remove:
try:
del temp[temp.index(peer)]
except:
continue
del remove[:]
del peerlist[:]
peerlist.extend(temp)
def listen(port, outbound, q, v, serv):
"""BLOCKING function which should only be run in a daemon thread. Listens and responds to other nodes"""
if serv:
from server.bounty import verify, addBounty
server = socket.socket()
server.bind(("0.0.0.0", port))
server.listen(10)
server.settimeout(5)
if sys.version_info[0] < 3 and sys.platform == "win32":
server.setblocking(True)
global ext_ip, ext_port
if outbound is True:
safeprint("UPnP mode is disabled")
else:
safeprint("UPnP mode is enabled")
if not portForward(port):
outbound = True
safeprint([outbound, ext_ip, ext_port])
q.put([outbound, ext_ip, ext_port])
while v.value: # is True is implicit
safeprint("listening on " + str(get_lan_ip()) + ":" + str(port), verbosity=3)
if not outbound:
safeprint("forwarded from " + ext_ip + ":" + str(ext_port), verbosity=3)
try:
conn, addr = server.accept()
server.setblocking(True)
conn.setblocking(True)
safeprint("connection accepted")
packet = recv(conn)
safeprint("Received: " + packet.decode(), verbosity=3)
key = None
if packet == peer_request:
key = handlePeerRequest(conn, True, key=key)
elif packet == bounty_request:
key = handleBountyRequest(conn, True, key=key)
elif packet == incoming_bounty:
key = handleIncomingBounty(conn, key=key)
send(close_signal, conn, key)
conn.close()
server.settimeout(5)
safeprint("connection closed")
except Exception as error:
safeprint("Failed: " + str(type(error)))
safeprint(error)
traceback.print_exc()
def handlePeerRequest(conn, exchange, key=None, received=[]):
"""Given a socket, send the proper messages to complete a peer request"""
if ext_port != -1:
unfiltered = peerlist[:] + [((ext_ip, ext_port), myPub.n, myPub.e)]
unfiltered = peerlist[:]
filtered = list(set(unfiltered) - set(received))
safeprint("Unfiltered: " + str(unfiltered), verbosity=3)
safeprint("Filtered: " + str(filtered), verbosity=3)
toSend = pickle.dumps(filtered, 0)
safeprint("Sending")
key = send(toSend, conn, key)
if exchange:
send(peer_request, conn, key)
received = recv(conn)
safeprint("Received exchange", verbosity=1)
safeprint(pickle.loads(received), verbosity=3)
peerlist.extend(pickle.loads(received))
trimPeers()
return key
def handleBountyRequest(conn, exchange, key=None, received=[]):
"""Given a socket, send the proper messages to complete a bounty request"""
unfiltered = getBountyList()
filtered = list(set(unfiltered) - set(received))
toSend = pickle.dumps(filtered, 0)
safeprint("Sending")
key = send(toSend, conn, key)
if exchange:
send(bounty_request, conn, key)
received = recv(conn)
safeprint("Received exchange")
try:
safeprint(pickle.loads(received), verbosity=2)
bounties = pickle.loads(received)
valids = addBounties(bounties)
toSend = []
for i in range(len(bounties)):
if valids[i] >= 0: # If the bounty is valid and not a duplicate, add it to propagation list
toSend.append(bounties[i])
propQueue.put((incoming_bounties, toSend))
except Exception as error:
safeprint("Could not add bounties")
safeprint(type(error))
traceback.print_exc()
# later add function to request without charity bounties
return key
def handleIncomingBounty(conn, key=None):
"""Given a socket, store an incoming bounty & report it valid or invalid"""
received = recv(conn)
safeprint("Adding bounty: " + received.decode())
try:
valid = addBounty(received)
if valid >= -1: # If valid, even if a duplicate, send valid signal
safeprint("Sending valid signal")
send(valid_signal, conn, key)
if valid >= 0: # If valid and not already received, propagate
propQueue.put((incoming_bounty, received))
else:
send(invalid_signal, conn, key)
except Exception as error:
send(invalid_signal, conn, key)
safeprint("Incoming failed: " + str(type(error)))
safeprint(error)
traceback.print_exc()
return key
def propagate(tup):
try:
conn = socket.socket()
address = tup[1]
conn.connect(address)
key = send(incoming_bounty, conn, None)
send(pickle.dumps(tup[0], 0), conn, key)
recv(conn)
conn.close()
except socket.error as Error:
safeprint("Connection to " + str(address) + " failed; cannot propagate")
def portForward(port):
"""Attempt to forward a port on your router to the specified local port. Prints lots of debug info."""
try:
import miniupnpc
u = miniupnpc.UPnP(None, None, 200, port)
# Begin Debug info
safeprint('inital(default) values :')
safeprint(' discoverdelay' + str(u.discoverdelay))
safeprint(' lanaddr' + str(u.lanaddr))
safeprint(' multicastif' + str(u.multicastif))
safeprint(' minissdpdsocket' + str(u.minissdpdsocket))
safeprint('Discovering... delay=%ums' % u.discoverdelay)
safeprint(str(u.discover()) + 'device(s) detected')
# End Debug info
u.selectigd()
global ext_ip
ext_ip = u.externalipaddress()
safeprint("external ip is: " + str(ext_ip))
for i in range(0, 20):
try:
safeprint("Port forward try: " + str(i), verbosity=1)
if u.addportmapping(port+i, 'TCP', get_lan_ip(), port, 'Bounty Net', ''):
global ext_port
ext_port = port + i
safeprint("External port is " + str(ext_port))
return True
except Exception as error:
safeprint("Failed: " + str(type(error)))
safeprint(error)
except Exception as error:
safeprint("Failed: " + str(type(error)))
safeprint(error)
return False
def listenp(port, v):
"""BLOCKING function which should only be run in a daemon thread. Listens and responds to other nodes"""
import time
while v.value: # is True is implicit
safeprint("listenp-ing", verbosity=3)
try:
while propQueue.empty() and v.value:
time.sleep(0.01)
packet = propQueue.get()
safeprint("Received: " + str(packet), verbosity=3)
if packet[0] == incoming_bounty:
bounty = pickle.loads(packet[1])
if bounty.isValid():
from multiprocessing.pool import ThreadPool
ThreadPool().map(propagate, [(bounty, x) for x in peerlist[:]])
elif packet[0] == incoming_bounties:
for bounty in packet[1]:
if bounty.isValid():
from multiprocessing.pool import ThreadPool
ThreadPool().map(propagate, [(bounty, x) for x in peerlist[:]])
safeprint("Packet processed")
except Exception as error:
safeprint("Failed: " + str(type(error)))
safeprint(error)
def sync(items):
if items.get('config'):
from common import settings
settings.config = items.get('config')
if items.get('peerList'):
global peerlist
peerList = items.get('peerList')
if items.get('bountyList'):
from common import bounty
bounty.bountyList = items.get('bountyList')
if items.get('bountyLock'):
from common import bounty
bounty.bountyLock = items.get('bountyLock')
if items.get('propQueue'):
global propQueue
propQueue = items.get('propQueue')
class listener(multiprocessing.Process): # pragma: no cover
"""A class to deal with the listener method"""
def __init__(self, port, outbound, q, v, serv):
multiprocessing.Process.__init__(self)
self.outbound = outbound
self.port = port
self.q = q
self.v = v
self.serv = serv
def run(self):
safeprint("listener started")
sync(self.items)
listen(self.port, self.outbound, self.q, self.v, self.serv)
safeprint("listener stopped")
class propagator(multiprocessing.Process): # pragma: no cover
"""A class to deal with the listener method"""
def __init__(self, port, v):
multiprocessing.Process.__init__(self)
self.port = port
self.v = v
def run(self):
safeprint("propagator started")
sync(self.items)
listenp(self.port, self.v)
safeprint("propagator stopped")