123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277 |
- #!/usr/bin/env python3
- # -*- coding: utf-8 -*-
- import asyncio
- import collections
- import os
- import re
- import shlex
- import sys
- from concurrent.futures import FIRST_COMPLETED
- from libnmap.parser import NmapParser
- regex_warning = re.compile('^Warning: .*', re.IGNORECASE)
- regex_nmap_version = re.compile('Nmap version [0-9]*\.[0-9]*[^ ]* \( https?://.* \)')
- regex_version = re.compile('[0-9]+')
- regex_subversion = re.compile('\.[0-9]+')
- class PortScannerBase(object):
- def __init__(self, nmap_search_path=(
- 'nmap', '/usr/bin/nmap', '/usr/local/bin/nmap', '/sw/bin/nmap', '/opt/local/bin/nmap')):
- self._nmap_path = '' # nmap path
- self._scan_result = {}
- self._nmap_version_number = 0 # nmap version number
- self._nmap_subversion_number = 0 # nmap subversion number
- self._nmap_search_path = nmap_search_path
- @asyncio.coroutine
- def _ensure_nmap_path_and_version(self):
- if self._nmap_path:
- return
- is_nmap_found = False
- for nmap_path in self._nmap_search_path:
- proc = None
- try:
- proc = yield from asyncio.create_subprocess_exec(nmap_path, '-V', stdout=asyncio.subprocess.PIPE)
- while True:
- line = yield from proc.stdout.readline()
- line = line.decode('utf8')
- if line and regex_nmap_version.match(line) is not None:
- is_nmap_found = True
- self._nmap_path = nmap_path
- # Search for version number
- rv = regex_version.search(line)
- rsv = regex_subversion.search(line)
- if rv is not None and rsv is not None:
- # extract version/subversion
- self._nmap_version_number = int(line[rv.start():rv.end()])
- self._nmap_subversion_number = int(
- line[rsv.start() + 1:rsv.end()]
- )
- break
- if proc.stdout.at_eof():
- break
- except:
- pass
- else:
- if is_nmap_found:
- break
- finally:
- if proc:
- try:
- proc.terminate()
- except ProcessLookupError:
- pass
- yield from proc.wait()
- if not is_nmap_found:
- raise NmapError('nmap program was not found in path')
- @asyncio.coroutine
- def nmap_version(self):
- yield from self._ensure_nmap_path_and_version()
- return (self._nmap_version_number, self._nmap_subversion_number)
- @asyncio.coroutine
- def listscan(self, hosts='127.0.0.1', dns_lookup=True, sudo=False, sudo_passwd=None):
- yield from self._ensure_nmap_path_and_version()
- nmap_args = self._get_scan_args(hosts, None, arguments='-sL' if dns_lookup else '-sL -n')
- return (yield from self._scan_proc(*nmap_args, sudo=sudo, sudo_passwd=sudo_passwd))
- def analyse_nmap_xml_scan(self, nmap_xml_output=None, nmap_err='', nmap_err_keep_trace='', nmap_warn_keep_trace=''):
- try:
- report = NmapParser.parse_fromstring(nmap_xml_output)
- report.__dict__['errors'] = nmap_err_keep_trace
- report.__dict__['warnings'] = nmap_warn_keep_trace
- return report
- except Exception:
- if len(nmap_err) > 0:
- raise NmapError(nmap_err)
- else:
- raise NmapError(nmap_xml_output)
- @asyncio.coroutine
- def _scan_proc(self, *nmap_args, sudo=False, sudo_passwd=None):
- proc = None
- try:
- if sudo:
- if not sudo_passwd:
- raise NmapError("sudo must with 'sudo_passwd' argument")
- proc = yield from asyncio.create_subprocess_exec('sudo', '-S', '-p', 'xxxxx', self._nmap_path,
- *nmap_args, stdin=asyncio.subprocess.PIPE,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE)
- else:
- proc = yield from asyncio.create_subprocess_exec(self._nmap_path, *nmap_args,
- stdout=asyncio.subprocess.PIPE,
- stderr=asyncio.subprocess.PIPE)
- nmap_output, nmap_err = yield from proc.communicate(None if not sudo else (sudo_passwd.encode() + b"\n"))
- if nmap_err:
- if sudo and nmap_err.strip() == b'xxxxx':
- nmap_err = b''
- except:
- raise
- finally:
- if proc:
- try:
- proc.terminate()
- except ProcessLookupError:
- pass
- yield from proc.wait()
- if nmap_err:
- nmap_err = nmap_err.decode('utf8')
- if nmap_output:
- nmap_output = nmap_output.decode('utf8')
- nmap_err_keep_trace = []
- nmap_warn_keep_trace = []
- if len(nmap_err) > 0:
- for line in nmap_err.split(os.linesep):
- if len(line) > 0:
- rgw = regex_warning.search(line)
- if rgw is not None:
- # sys.stderr.write(line+os.linesep)
- nmap_warn_keep_trace.append(line + os.linesep)
- else:
- # raise NmapError(nmap_err)
- nmap_err_keep_trace.append(nmap_err)
- return self.analyse_nmap_xml_scan(
- nmap_xml_output=nmap_output,
- nmap_err=nmap_err,
- nmap_err_keep_trace=nmap_err_keep_trace,
- nmap_warn_keep_trace=nmap_warn_keep_trace
- )
- def _get_scan_args(self, hosts, ports, arguments):
- assert isinstance(hosts, (
- str, collections.Iterable)), 'Wrong type for [hosts], should be a string or Iterable [was {0}]'.format(
- type(hosts))
- assert isinstance(ports, (str, collections.Iterable, type(
- None))), 'Wrong type for [ports], should be a string or Iterable [was {0}]'.format(type(ports)) # noqa
- assert isinstance(arguments, str), 'Wrong type for [arguments], should be a string [was {0}]'.format(
- type(arguments)) # noqa
- if not isinstance(hosts, str):
- hosts = ' '.join(hosts)
- assert all(_ not in arguments for _ in ('-oX', '-oA')), 'Xml output can\'t be redirected from command line'
- if ports and not isinstance(ports, str):
- ports = ','.join(str(port) for port in ports)
- hosts_args = shlex.split(hosts)
- scan_args = shlex.split(arguments)
- return ['-oX', '-'] + hosts_args + ['-p', ports] * bool(ports) + scan_args
- class PortScanner(PortScannerBase):
- @asyncio.coroutine
- def scan(self, hosts='127.0.0.1', ports=None, arguments='-sV', sudo=False, sudo_passwd=None):
- yield from self._ensure_nmap_path_and_version()
- scan_result = yield from self._scan_proc(*(self._get_scan_args(hosts, ports, arguments)), sudo=sudo,
- sudo_passwd=sudo_passwd)
- self._scan_result = scan_result
- return scan_result
- def __getitem__(self, host):
- """
- returns a host detail
- """
- return self._scan_result['scan'][host]
- gt_py35 = (sys.version_info.major == 3 and sys.version_info.minor >= 5) or sys.version_info.major > 3
- if gt_py35:
- class PortScannerIterable(object):
- def __init__(self, scanner, hosts, args, batch_count=3, sudo=False, sudo_passwd=None):
- self._scanner = scanner
- self._hosts = hosts
- self._args = args
- self._futs = set()
- self._batch_count = batch_count
- self._stop_ip_gen = False
- self._done_fut_gen = None
- self._stopped = False
- self._started = False
- self.sudo = sudo
- self.sudo_passwd = sudo_passwd
- def _done_fu_generator(self, done_futs):
- yield from done_futs
- def _get_result(self):
- fu = self._done_fut_gen.send(None)
- exception = fu.exception()
- return exception if exception is not None else fu.result()
- async def __aiter__(self):
- return self
- def _ip_generator(self, ip_list):
- yield from ip_list
- def _fill_future(self):
- try:
- while len(self._futs) < self._batch_count:
- self._futs.add(asyncio.ensure_future(
- self._scanner._scan_proc(self._ip_gen.send(None), *self._args, sudo=self.sudo,
- sudo_passwd=self.sudo_passwd)))
- except StopIteration:
- self._stop_ip_gen = True
- async def __anext__(self):
- if not self._started:
- self._started = True
- list_scan = await self._scanner.listscan(self._hosts, False, sudo=self.sudo,
- sudo_passwd=self.sudo_passwd)
- if not list_scan:
- return
- ip_list = [i.address for i in list_scan.hosts]
- if not ip_list:
- raise StopAsyncIteration()
- self._ip_gen = self._ip_generator(ip_list)
- self._fill_future()
- elif self._done_fut_gen:
- try:
- return self._get_result()
- except:
- self._done_fut_gen = None
- if self._stopped:
- raise StopAsyncIteration()
- done, pending = await asyncio.wait(self._futs, return_when=FIRST_COMPLETED)
- self._done_fut_gen = self._done_fu_generator(done)
- if not pending and self._stop_ip_gen:
- self._stopped = True
- else:
- self._futs = pending
- if not self._stop_ip_gen:
- self._fill_future()
- if not self._futs and not pending:
- self._stopped = True
- return self._get_result()
- class PortScannerYield(PortScannerBase):
- def scan(self, hosts, ports, arguments, batch_count=3, sudo=False, sudo_passwd=None):
- args = self._get_scan_args('', ports, arguments)
- return PortScannerIterable(self, hosts, args, batch_count, sudo, sudo_passwd)
- class NmapError(Exception):
- """
- Exception error class for PortScanner class
- """
- def __init__(self, value):
- self.value = value
- def __str__(self):
- return repr(self.value)
- def __repr__(self):
- return 'NmapError exception {0}'.format(self.value)
|