1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
| """Test the pre-kill hook on Darwin."""
from __future__ import print_function
# system imports
from multiprocessing import Process, Queue
import platform
import re
from unittest import main, TestCase
# third party
from six import StringIO
def do_child_process(child_work_queue, parent_work_queue, verbose):
import os
pid = os.getpid()
if verbose:
print("child: pid {} started, sending to parent".format(pid))
parent_work_queue.put(pid)
if verbose:
print("child: waiting for shut-down request from parent")
child_work_queue.get()
if verbose:
print("child: received shut-down request. Child exiting.")
class DarwinPreKillTestCase(TestCase):
def __init__(self, methodName):
super(DarwinPreKillTestCase, self).__init__(methodName)
self.process = None
self.child_work_queue = None
self.verbose = False
def tearDown(self):
if self.verbose:
print("parent: sending shut-down request to child")
if self.process:
self.child_work_queue.put("hello, child")
self.process.join()
if self.verbose:
print("parent: child is fully shut down")
def test_sample(self):
# Ensure we're Darwin.
if platform.system() != 'Darwin':
self.skipTest("requires a Darwin-based OS")
# Start the child process.
self.child_work_queue = Queue()
parent_work_queue = Queue()
self.process = Process(target=do_child_process,
args=(self.child_work_queue, parent_work_queue,
self.verbose))
if self.verbose:
print("parent: starting child")
self.process.start()
# Wait for the child to report its pid. Then we know we're running.
if self.verbose:
print("parent: waiting for child to start")
child_pid = parent_work_queue.get()
# Sample the child process.
from darwin import do_pre_kill
context_dict = {
"archs": [platform.machine()],
"platform_name": None,
"platform_url": None,
"platform_working_dir": None
}
if self.verbose:
print("parent: running pre-kill action on child")
output_io = StringIO()
do_pre_kill(child_pid, context_dict, output_io)
output = output_io.getvalue()
if self.verbose:
print("parent: do_pre_kill() wrote the following output:", output)
self.assertIsNotNone(output)
# We should have a line with:
# Process: .* [{pid}]
process_re = re.compile(r"Process:[^[]+\[([^]]+)\]")
match = process_re.search(output)
self.assertIsNotNone(match, "should have found process id for "
"sampled process")
self.assertEqual(1, len(match.groups()))
self.assertEqual(child_pid, int(match.group(1)))
# We should see a Call graph: section.
callgraph_re = re.compile(r"Call graph:")
match = callgraph_re.search(output)
self.assertIsNotNone(match, "should have found the Call graph section"
"in sample output")
# We should see a Binary Images: section.
binary_images_re = re.compile(r"Binary Images:")
match = binary_images_re.search(output)
self.assertIsNotNone(match, "should have found the Binary Images "
"section in sample output")
if __name__ == "__main__":
main()
|