c2f95e4b5122946ae119eb685356da3a2aa49246
[oweals/gnunet.git] / src / dht / test_dht_tools.py.in
1 #!@PYTHON@
2 #
3 # This testcase simply checks that the DHT command-line tools work.
4 # It launches a single peer, stores a value "testdata" under "testkey",
5 # and then gives the system 50 ms to fetch it.
6 #
7 # This could fail if
8 # - command line tool interfaces fail
9 # - DHT plugins for storage are not installed / working
10 # - block plugins for verification (the test plugin) is not installed
11 #
12 # The code does NOT depend on DHT routing or any actual P2P functionality.
13 #
14
15 import os
16 import sys
17 import shutil
18 import re
19 import subprocess
20 import time
21 import tempfile
22
23 os.environ["PATH"] = "@bindir@" + ":" + os.environ["PATH"]
24
25 if os.name == "nt":
26     tmp = os.getenv("TEMP")
27 else:
28     tmp = "/tmp"
29
30 if os.name == 'nt':
31     get = './gnunet-dht-get.exe'
32     put = './gnunet-dht-put.exe'
33     arm = 'gnunet-arm.exe'
34 else:
35     get = './gnunet-dht-get'
36     put = './gnunet-dht-put'
37     arm = 'gnunet-arm'
38
39 cfgfile = 'test_dht_api_peer1.conf'
40 run_get = [get, '-c', cfgfile]
41 run_put = [put, '-c', cfgfile]
42 run_arm = [arm, '-c', cfgfile]
43 debug = os.getenv('DEBUG')
44 if debug:
45     run_arm += [debug.split(' ')]
46
47
48 def cleanup(exitcode):
49     sys.exit(exitcode)
50
51
52 def sub_run(args, want_stdo=True, want_stde=False, nofail=False):
53     if want_stdo:
54         stdo = subprocess.PIPE
55     else:
56         stdo = None
57     if want_stde:
58         stde = subprocess.PIPE
59     else:
60         stde = None
61     p = subprocess.Popen(args, stdout=stdo, stderr=stde)
62     stdo, stde = p.communicate()
63     if not nofail:
64         if p.returncode != 0:
65             sys.exit(p.returncode)
66     return (p.returncode, stdo, stde)
67
68
69 def fail(result):
70     print(result)
71     r_arm(['-e'], want_stdo=False)
72     cleanup(1)
73
74
75 def r_something(to_run, extra_args, failer=None, normal=True, **kw):
76     rc, stdo, stde = sub_run(to_run + extra_args, nofail=True, **kw)
77     if failer is not None:
78         failer(to_run + extra_args, rc, stdo, stde, normal)
79     return (rc, stdo, stde)
80
81
82 def r_arm(extra_args, **kw):
83     return r_something(run_arm, extra_args, **kw)
84
85
86 def r_get(extra_args, **kw):
87     return r_something(run_get, extra_args, **kw)
88
89
90 def r_put(extra_args, **kw):
91     return r_something(run_put, extra_args, **kw)
92
93
94 def end_arm_failer(command, rc, stdo, stde, normal):
95     if normal:
96         if rc != 0:
97             fail(
98                 "FAIL: error running {}\nCommand output was:\n{}\n{}".format(
99                     command, stdo, stde
100                 )
101             )
102     else:
103         if rc == 0:
104             fail(
105                 "FAIL: expected error while running {}\nCommand output was:\n{}\n{}"
106                 .format(command, stdo, stde)
107             )
108
109
110 def print_only_failer(command, rc, stdo, stde, normal):
111     if normal:
112         if rc != 0:
113             print(
114                 "FAIL: error running {}\nCommand output was:\n{}\n{}".format(
115                     command, stdo, stde
116                 )
117             )
118             cleanup(1)
119     else:
120         if rc == 0:
121             print(
122                 "FAIL: expected error while running {}\nCommand output was:\n{}\n{}"
123                 .format(command, stdo, stde)
124             )
125             cleanup(1)
126
127
128 print("TEST: Starting ARM...", end='')
129 r_arm(['-s'], failer=end_arm_failer, want_stdo=False, want_stde=False)
130 print("PASS")
131 time.sleep(1)
132
133 print("TEST: Testing put...", end='')
134 r_put(['-k', 'testkey', '-d', 'testdata', '-t', '8'], failer=end_arm_failer)
135 print("PASS")
136 time.sleep(1)
137
138 print("TEST: Testing get...", end='')
139 rc, stdo, stde = r_get(['-k', 'testkey', '-T', '50 ms', '-t', '8'],
140                        want_stdo=True,
141                        failer=end_arm_failer)
142 stdo = stdo.decode('utf-8').replace('\r', '').splitlines()
143 expect = "Result 0, type 8:\ntestdata".splitlines()
144 if len(stdo) != 2 or len(expect
145                          ) != 2 or stdo[0] != expect[0] or stdo[1] != expect[1]:
146     fail("output `{}' differs from expected `{}'".format(stdo, expect))
147 print("PASS")
148
149 r_arm(['-e', '-d'], failer=print_only_failer)