飞道的博客

Python实现端口检测

304人阅读  评论(0)

一、背景:

在平时工作中有遇到端口检测,查看服务端特定端口是否对外开放,常用nmap,tcping,telnet等,同时也可以利用站长工具等web扫描端口等。
但是在使用站长工具发现:

  • 每次只能输入一个检测的地址;
  • 虽然可以输入多个端口,但是不能指定一个端口范围来进行批量检测;
  • 没有批量任务记录日志等;

因避免由于局域网检测发起端网络限制而导致的端口检测异常,未使用python-nmap
想通过调用站长工具,实现

  • 单次可多个地址或域名检测
  • 单次可指定端口范围,批量检测
  • 记录日志

二、代码:

2.1 结构

2.2 代码

github地址

  • info.cfg代码

  
  1. #address = 8.8.8.8
  2. address = www.moguyun.com,www.51cto.com,www.anchnet.com
  3. #检查的端口,如多个端口使用,隔开,端口范围使用'-'
  4. #ports = 80,8080....
  5. ports = 20-25,80,443,1433,1521,3306,3389,6379,8080,27017
  6. #日志配置
  7. [loginfo]
  8. #日志目录
  9. logdir_name = logdir
  10. #日志文件名称
  11. logfile_name = check_port.log
  • logger.py代码

  
  1. #!/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # _auth:kelly
  4. import os
  5. import time
  6. import logging
  7. import configparser
  8. class LogHelper():
  9. """
  10. 初始化logger,读取目录及文件名称
  11. """
  12. def __init__(self):
  13. configoper = configparser.ConfigParser()
  14. configoper.read( 'info.cfg',encoding= 'utf-8')
  15. self.logdir_name = configoper[ 'loginfo'][ 'logdir_name']
  16. self.logfile_name = configoper[ 'loginfo'][ 'logfile_name']
  17. def create_dir(self):
  18. """
  19. 创建目录
  20. :return: 文件名称
  21. """
  22. _LOGDIR = os.path.join(os.path.dirname(__file__), self.logdir_name)
  23. _TIME = time.strftime( '%Y-%m-%d', time.gmtime()) + '-'
  24. _LOGNAME = _TIME + self.logfile_name
  25. LOGFILENAME = os.path.join(_LOGDIR, _LOGNAME)
  26. if not os.path.exists(_LOGDIR):
  27. os.mkdir(_LOGDIR)
  28. return LOGFILENAME
  29. def create_logger(self, logfilename):
  30. """
  31. 创建logger对象
  32. :param logfilename:
  33. :return: logger对象
  34. """
  35. logger = logging.getLogger()
  36. logger.setLevel(logging.INFO)
  37. handler = logging.FileHandler(logfilename)
  38. handler.setLevel(logging.INFO)
  39. formater = logging.Formatter( '%(asctime)s - %(name)s - %(levelname)s - %(message)s')
  40. handler.setFormatter(formater)
  41. logger.addHandler(handler)
  42. return logger
  • tcp_check_port.py 代码

  
  1. #!/bin/env python
  2. # -*- coding:utf-8 -*-
  3. # _auth:kaliarch
  4. import requests
  5. from configparser import ConfigParser
  6. import re
  7. from tcp_port_check import logger
  8. class check_ports():
  9. def __init__(self,logger):
  10. """
  11. 初始化,获取配置文件信息
  12. """
  13. self.url = 'http://tool.chinaz.com/iframe.ashx?t=port'
  14. self.headers = {
  15. 'Accept': 'text/javascript, application/javascript, application/ecmascript, application/x-ecmascript, */*; q=0.01',
  16. 'Accept-Encoding': 'gzip, deflate',
  17. 'Accept-Language': 'zh-CN,zh;q=0.8',
  18. 'Connection': 'keep-alive',
  19. 'Content-Length': '62',
  20. 'Content-Type': 'application/x-www-form-urlencoded; charset=UTF-8',
  21. 'Host': 'tool.chinaz.com',
  22. 'Origin': 'http://tool.chinaz.com',
  23. 'Referer': 'http://tool.chinaz.com/port/',
  24. 'User-Agent': 'Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/61.0.3163.100 Safari/537.36',
  25. 'X-Requested-With': 'XMLHttpRequest'
  26. }
  27. config = ConfigParser()
  28. config.read( 'info.cfg',encoding= 'utf-8')
  29. self.address_list = config[ 'port_check_info'][ 'address']
  30. self.port_list = config[ 'port_check_info'][ 'ports']
  31. #初始化logger
  32. logger = logger.LogHelper()
  33. logname = logger.create_dir()
  34. self.logoper = logger.create_logger(logname)
  35. def _get_body(self):
  36. """
  37. 获取address和port
  38. :return: list
  39. """
  40. address_list = self.address_list.split( ',')
  41. port_list = self.port_list.split( ',')
  42. # 处理端口范围,返回range
  43. range_flag = False
  44. port_range = None
  45. content_List_range = []
  46. for port in port_list:
  47. if '-' in port:
  48. range_flag = True
  49. port_range = range(int(port.split( '-')[ 0]),int(port.split( '-')[ 1])+ 1)
  50. port_list.remove(port)
  51. # 处理总体list
  52. for add in address_list:
  53. if range_flag:
  54. for port in port_range:
  55. content_List_range.append(add + ':' + str(port))
  56. # 合并range和普通list
  57. content_List = [ add+ ':'+port for add in address_list for port in port_list ]
  58. content_List_range.extend(content_List)
  59. return content_List_range
  60. def run(self):
  61. """
  62. 进行端口检测
  63. :return:
  64. """
  65. for content in self._get_body():
  66. content_list = content.split( ':')
  67. body = {
  68. 'host': content_list[ 0],
  69. 'port': content_list[ 1],
  70. 'encode': 'tlCHS1u3IgF4sC57m6KOP3Oaj1Y1kfLq'
  71. }
  72. try:
  73. response = requests.post(url=self.url,data=body,headers=self.headers)
  74. port_status = re.findall( "msg:'(.*?)'", response.text)
  75. if len(port_status) > 0:
  76. print( '%s,port status is:%s' % (content, port_status))
  77. self.logoper.info( '%s,port status is:%s' % (content, port_status))
  78. else:
  79. self.logoper.info( '%s,port status is:%s' % (content, port_status))
  80. print( 'Occer error!请输入正确的地址和端口')
  81. except Exception as e:
  82. self.logoper.info(e)
  83. if __name__ == '__main__':
  84. check_app = check_ports(logger)
  85. check_app.run()

三、测试

3.1 查看检测结果

通过日志可以看出蘑菇云和51cto的80和443端口都是放开的

3.2 查看日志

四、改进

  • 后期可以添加异步多进程等来提升效率
  • 可以对比多个站点检测结果,使结果更准确
  • 整合nmap内网也可检测

转载:https://blog.csdn.net/qq_36441027/article/details/105628425
查看评论
* 以上用户言论只代表其个人观点,不代表本网站的观点或立场