#!@PYTHON@
# This file is part of GNUnet.
# (C) 2010, 2017, 2018 Christian Grothoff (and other contributing authors)
#
# GNUnet is free software: you can redistribute it and/or modify it
# under the terms of the GNU Affero General Public License as published
# by the Free Software Foundation, either version 3 of the License,
# or (at your option) any later version.
#
# GNUnet is distributed in the hope that it will be useful, but
# WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
# Affero General Public License for more details.
#
# You should have received a copy of the GNU Affero General Public License
# along with this program. If not, see .
#
# SPDX-License-Identifier: AGPL3.0-or-later
#
# Functions for integration testing
from __future__ import unicode_literals
from __future__ import print_function
from builtins import object
from builtins import str
import os
import subprocess
import sys
import shutil
import time
from gnunet_pyexpect import pexpect
class Check(object):
def __init__(self, test):
self.fulfilled = False
self.conditions = list()
self.test = test
def add(self, condition):
self.conditions.append(condition)
def run(self):
fulfilled = True
pos = 0
neg = 0
for c in self.conditions:
if (False == c.check()):
fulfilled = False
neg += 1
else:
pos += 1
return fulfilled
def run_blocking(self, timeout, pos_cont, neg_cont):
execs = 0
res = False
while ((False == res) and (execs < timeout)):
res = self.run()
time.sleep(1)
execs += 1
if ((False == res) and (execs >= timeout)):
print(('Check had timeout after ' + str(timeout) + ' seconds'))
neg_cont(self)
elif ((False == res) and (execs < timeout)):
if (None != neg_cont):
neg_cont(self)
else:
if (None != pos_cont):
pos_cont(self)
return res
def run_once(self, pos_cont, neg_cont):
execs = 0
res = False
res = self.run()
if ((res == False) and (neg_cont != None)):
neg_cont(self)
if ((res == True) and (pos_cont != None)):
pos_cont(self)
return res
def evaluate(self, failed_only):
pos = 0
neg = 0
for c in self.conditions:
if (False == c.evaluate(failed_only)):
neg += 1
else:
pos += 1
print((str(pos) + ' out of ' + str(pos+neg) + ' conditions fulfilled'))
return self.fulfilled
def reset(self):
self.fulfilled = False
for c in self.conditions:
c.fulfilled = False
class Condition(object):
def __init__(self):
self.fulfilled = False
self.type = 'generic'
def __init__(self, type):
self.fulfilled = False
self.type = type
def check(self):
return False
def evaluate(self, failed_only):
if ((self.fulfilled == False) and (failed_only == True)):
print(str(self.type) + 'condition for was ' + str(self.fulfilled))
elif (failed_only == False):
print(str(self.type) + 'condition for was ' + str(self.fulfilled))
return self.fulfilled
class FileExistCondition(Condition):
def __init__(self, file):
self.fulfilled = False
self.type = 'file'
self.file = file
def check(self):
if (self.fulfilled == False):
res = os.path.isfile(self.file)
if (res == True):
self.fulfilled = True
return True
else:
return False
else:
return True
def evaluate(self, failed_only):
if ((self.fulfilled == False) and (failed_only == True)):
print(str(self.type) +
'condition for file ' +
self.file +
' was ' +
str(self.fulfilled))
elif (failed_only == False):
print(str(self.type) +
'condition for file ' +
self.file +
' was ' +
str(self.fulfilled))
return self.fulfilled
class StatisticsCondition(Condition):
def __init__(self, peer, subsystem, name, value):
self.fulfilled = False
self.type = 'statistics'
self.peer = peer
self.subsystem = subsystem
self.name = name
self.value = value
self.result = -1
def check(self):
if (self.fulfilled == False):
self.result = self.peer.get_statistics_value(self.subsystem, self.name)
if (str(self.result) == str(self.value)):
self.fulfilled = True
return True
else:
return False
else:
return True
def evaluate(self, failed_only):
if (self.result == -1):
res = b'NaN'
else:
res = str(self.result).encode('utf-8')
if (self.fulfilled == False):
fail = b" FAIL!"
op = b" != "
else:
fail = b""
op = b" == "
if (((self.fulfilled == False) and (failed_only == True)) or (failed_only == False)):
print(self.peer.id[:4] +
b" " +
self.peer.cfg.encode('utf-8') +
b" " +
str(self.type).encode('utf-8') +
b' condition in subsystem "' +
self.subsystem.encode('utf-8').ljust(12) +
b'" : "' +
self.name.encode('utf-8').ljust(30) +
b'" : (expected/real value) ' +
str(self.value).encode('utf-8') +
op +
res +
fail)
return self.fulfilled
# Specify two statistic values and check if they are equal
class EqualStatisticsCondition(Condition):
def __init__(self, peer, subsystem, name, peer2, subsystem2, name2):
self.fulfilled = False
self.type = 'equalstatistics'
self.peer = peer
self.subsystem = subsystem
self.name = name
self.result = -1
self.peer2 = peer2
self.subsystem2 = subsystem2
self.name2 = name2
self.result2 = -1
def check(self):
if (self.fulfilled == False):
self.result = self.peer.get_statistics_value(self.subsystem, self.name)
self.result2 = self.peer2.get_statistics_value(self.subsystem2, self.name2)
if (str(self.result) == str(self.result2)):
self.fulfilled = True
return True
else:
return False
else:
return True
def evaluate(self, failed_only):
if (self.result == -1):
res = b'NaN'
else:
res = str(self.result).encode('utf-8')
if (self.result2 == -1):
res2 = b'NaN'
else:
res2 = str(self.result2).encode('utf-8')
if (self.fulfilled == False):
fail = b" FAIL!"
op = b" != "
else:
fail = b""
op = b" == "
if (((self.fulfilled == False) and (failed_only == True)) or (failed_only == False)):
print(self.peer.id[:4] +
b' "' +
self.subsystem.encode('utf-8').ljust(12) +
b'" "' +
self.name.encode('utf-8').ljust(30) +
b'" == ' +
str(self.result).encode('utf-8') +
b" " +
self.peer2.id[:4] +
b' "' +
self.subsystem2.encode('utf-8').ljust(12) +
b'" ' +
self.name2.encode('utf-8').ljust(30) +
b'" ' +
str(self.result2).encode('utf-8'))
return self.fulfilled
class Test(object):
def __init__(self, testname, verbose):
self.peers = list()
self.verbose = verbose
self.name = testname
srcdir = "../.."
gnunet_pyexpect_dir = os.path.join(srcdir, "contrib/scripts")
if gnunet_pyexpect_dir not in sys.path:
sys.path.append(gnunet_pyexpect_dir)
self.gnunetarm = ''
self.gnunetstatistics = ''
if os.name == 'posix':
self.gnunetarm = 'gnunet-arm'
self.gnunetstatistics = 'gnunet-statistics'
self.gnunetpeerinfo = 'gnunet-peerinfo'
elif os.name == 'nt':
self.gnunetarm = 'gnunet-arm.exe'
self.gnunetstatistics = 'gnunet-statistics.exe'
self.gnunetpeerinfo = 'gnunet-peerinfo.exe'
if os.name == "nt":
shutil.rmtree(os.path.join(os.getenv("TEMP"), testname), True)
else:
shutil.rmtree("/tmp/" + testname, True)
def add_peer(self, peer):
self.peers.append(peer)
def p(self, msg):
if (self.verbose == True):
print(msg)
class Peer(object):
def __init__(self, test, cfg_file):
if (False == os.path.isfile(cfg_file)):
print(("Peer cfg " + cfg_file + ": FILE NOT FOUND"))
self.id = ""
self.test = test
self.started = False
self.cfg = cfg_file
def __del__(self):
if (self.started == True):
print('ERROR! Peer using cfg ' + self.cfg + ' was not stopped')
ret = self.stop()
if (False == ret):
print('ERROR! Peer using cfg ' +
self.cfg +
' could not be stopped')
self.started = False
return ret
else:
return False
def start(self):
self.test.p("Starting peer using cfg " + self.cfg)
try:
server = subprocess.Popen([self.test.gnunetarm, '-sq', '-c', self.cfg])
server.communicate()
except OSError:
print("Can not start peer")
self.started = False
return False
self.started = True
test = ''
try:
server = pexpect()
server.spawn(None, [self.test.gnunetpeerinfo, '-c', self.cfg, '-s'], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
test = server.read("stdout", 1024)
except OSError:
print("Can not get peer identity")
test = (test.split(b'`')[1])
self.id = test.split(b'\'')[0]
return True
def stop(self):
if (self.started == False):
return False
self.test.p("Stopping peer using cfg " + self.cfg)
try:
server = subprocess.Popen([self.test.gnunetarm, '-eq', '-c', self.cfg])
server.communicate()
except OSError:
print("Can not stop peer")
return False
self.started = False
return True
def get_statistics_value(self, subsystem, name):
server = pexpect()
server.spawn(None, [self.test.gnunetstatistics, '-c', self.cfg, '-q', '-n', name, '-s', subsystem], stdout=subprocess.PIPE, stderr=subprocess.STDOUT)
# server.expect ("stdout", re.compile (r""))
test = server.read("stdout", 10240)
tests = test.partition(b'\n')
# On W32 GNUnet outputs with \r\n, rather than \n
if os.name == 'nt' and tests[1] == b'\n' and tests[0][-1] == b'\r':
tests = (tests[0][:-1], tests[1], tests[2])
tests = tests[0]
if (tests.isdigit() == True):
return tests
else:
return -1