O'zbekiston milliy universiteti qo'lyozma huquqida


Download 6,02 Mb.
Hajmi6,02 Mb.
1   ...   48   49   50   51   52   53   54   55   56
Dissertatsiya SH.M


import json

import sys

import os

import argparse

import threading

from common.common import *

from common.logger import Log

from common.corscheck import CORSCheck

import gevent

from gevent import monkey


from gevent.pool import Pool

from gevent.queue import Queue

from colorama import init

# Globals

results = []

def parser_error(errmsg):


print(("Usage: python " + sys.argv[0] + " [Options] use -h for help"))

print(("Error: " + errmsg))


def parse_args():

# parse the arguments

parser = argparse.ArgumentParser(

epilog='\tExample: \r\npython ' + sys.argv[0] + " -u google.com")

parser.error = parser_error

parser._optionals.title = "OPTIONS"


'-u', '--url', help="URL/domain to check it's CORS policy")




help='URL/domain list file to check their CORS policy')




help='Number of threads to use for CORS scan',



parser.add_argument('-o', '--output', help='Save the results to json file')




help='Enable Verbosity and display results in realtime',



parser.add_argument('-d', '--headers', help='Add headers to the request.', default=None, nargs='*')




help='Set requests timeout (default 5 sec)',



parser.add_argument('-p', '--proxy', help='Enable proxy (http or socks5)')

args = parser.parse_args()

if not (args.url or args.input):

parser.error("No url inputed, please add -u or -i option")

if args.input and not os.path.isfile(args.input):

parser.error("Input file " + args.input + " not exist.")

return args

# Synchronize results

c = threading.Condition()

def scan(cfg):

log = cfg["logger"]

global results

while not cfg["queue"].empty():


item = cfg["queue"].get(timeout=1.0)

cors_check = CORSCheck(item, cfg)

msg = cors_check.check_one_by_one()

# Keeping results to be written to file only if needed

if log.filename and msg:




except Exception as e:




CORScanner library API interface for other projects to use. This will check

the CORS policy for a given URL. Example Usage:

>>> from CORScanner.cors_scan import cors_check

>>> ret = cors_check("https://www.instagram.com", None)

>>> ret

{'url': 'https://www.instagram.com', 'type': 'reflect_origin',...}


def cors_check(url, headers=None):

# 0: 'DEBUG', 1: 'INFO', 2: 'WARNING', 3: 'ALERT', 4: 'disable log'

log = Log(None, print_level=4)

cfg = {"logger": log, "headers": headers, "timeout": 5}

cors_check = CORSCheck(url, cfg)

#msg = cors_check.check_all_in_parallel()

msg = cors_check.check_one_by_one()

return msg

def main():


args = parse_args()


queue = Queue()

log_level = 1 if args.verbose else 2 # 1: INFO, 2: WARNING

log = Log(args.output, log_level)

cfg = {"logger": log, "queue": queue, "headers": parse_headers(args.headers),

"timeout": args.timeout, "proxy": args.proxy}

read_urls(args.url, args.input, queue)

sys.stderr.write("Starting CORS scan...(Tips: this may take a while, add -v option to enable debug info)\n")


threads = [gevent.spawn(scan, cfg) for i in range(args.threads)]



except KeyboardInterrupt as e:


# Writing results file if output file has been set

if log.filename:

with open(log.filename, 'w') as output_file:

output_file.write(json.dumps(results, indent=4))


sys.stderr.write("Finished CORS scanning...\n")


if __name__ == '__main__':


import gevent.monkey


import requests, json, os, inspect, tldextract

from future.utils import iteritems


from urllib.parse import urlparse

except Exception as e:

from urlparse import urlparse

import urllib3


from threading import Thread

class CORSCheck:

"""docstring for CORSCheck"""

url = None

cfg = None

headers = None

timeout = None

result = {}

def __init__(self, url, cfg):

self.url = url

self.cfg = cfg

self.timeout = cfg["timeout"]

self.all_results = []

if cfg["headers"] != None:

self.headers = cfg["headers"]

self.proxies = {}

if cfg.get("proxy") != None:

self.proxies = {

"http": cfg["proxy"],

"https": cfg["proxy"],


def send_req(self, url, origin):


headers = {






'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_12_5) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/59.0.3071.115 Safari/537.36'


if self.headers != None:


# self-signed cert OK, follow redirections

resp = requests.get(self.url, timeout=self.timeout, headers=headers,

verify=False, allow_redirects=True, proxies=self.proxies)

# remove cross-domain redirections, which may cause false results

first_domain =tldextract.extract(url).registered_domain

last_domain = tldextract.extract(resp.url).registered_domain

if(first_domain.lower() != last_domain.lower()):

resp = None

except Exception as e:

resp = None

return resp

def get_resp_headers(self, resp):

if resp == None:

return None

resp_headers = dict(

(k.lower(), v) for k, v in iteritems(resp.headers))

return resp_headers

def check_cors_policy(self, test_module_name,test_origin,test_url):

resp = self.send_req(self.url, test_origin)

resp_headers = self.get_resp_headers(resp)

status_code = resp.status_code if resp is not None else None

if resp_headers == None:

return None

parsed = urlparse(str(resp_headers.get("access-control-allow-origin")))

if test_origin != "null":

resp_origin = parsed.scheme + "://" + parsed.netloc.split(':')[0]


resp_origin = str(resp_headers.get("access-control-allow-origin"))

msg = None

# test_origin does not have to be case sensitive

if test_origin.lower() == resp_origin.lower():

credentials = "false"

if resp_headers.get("access-control-allow-credentials") == "true":

credentials = "true"

# Set the msg

msg = {

"url": test_url,

"type": test_module_name,

"credentials": credentials,

"origin": test_origin,

"status_code" : status_code


return msg

def is_cors_permissive(self,test_module_name,test_origin,test_url):

msg = self.check_cors_policy(test_module_name,test_origin,test_url)

if msg != None:


self.result = msg


return True

self.cfg["logger"].info("nothing found for {url: %s, origin: %s, type: %s}" % (test_url, test_origin, test_module_name))

return False

def test_reflect_origin(self):

module_name = inspect.stack()[0][3].replace('test_','');

test_url = self.url

parsed = urlparse(test_url)

test_origin = parsed.scheme + "://" + "evil.com"


"Start checking %s for %s" % (module_name,test_url))

return self.is_cors_permissive(module_name,test_origin,test_url)

def test_prefix_match(self):

module_name = inspect.stack()[0][3].replace('test_','');

test_url = self.url

parsed = urlparse(test_url)

test_origin = parsed.scheme + "://" + parsed.netloc.split(':')[0] + ".evil.com"


"Start checking %s for %s" % (module_name,test_url))

return self.is_cors_permissive(module_name,test_origin,test_url)

def test_suffix_match(self):

module_name = inspect.stack()[0][3].replace('test_','');

test_url = self.url

parsed = urlparse(test_url)

sld = tldextract.extract(test_url.strip()).registered_domain

test_origin = parsed.scheme + "://" + "evil" + sld


"Start checking %s for %s" % (module_name,test_url))

return self.is_cors_permissive(module_name,test_origin,test_url)

def test_trust_null(self):

module_name = inspect.stack()[0][3].replace('test_','');

test_url = self.url

test_origin = "null"


"Start checking %s for %s" % (module_name,test_url))

return self.is_cors_permissive(module_name,test_origin,test_url)

def test_include_match(self):

module_name = inspect.stack()[0][3].replace('test_','');

test_url = self.url

parsed = urlparse(test_url)

sld = tldextract.extract(test_url.strip()).registered_domain

test_origin = parsed.scheme + "://" + sld[1:]


"Start checking %s for %s" % (module_name,test_url))

return self.is_cors_permissive(module_name,test_origin,test_url)

def test_not_escape_dot(self):

module_name = inspect.stack()[0][3].replace('test_','');

test_url = self.url

parsed = urlparse(test_url)

sld = tldextract.extract(test_url.strip()).registered_domain

domain = parsed.netloc.split(':')[0]

test_origin = parsed.scheme + "://" + domain[::-1].replace(

'.', 'a', 1)[::-1]


"Start checking %s for %s" % (module_name,test_url))

return self.is_cors_permissive(module_name,test_origin,test_url)

def test_trust_any_subdomain(self):

module_name = inspect.stack()[0][3].replace('test_','');

test_url = self.url

parsed = urlparse(test_url)

test_origin = parsed.scheme + "://" + "evil." + parsed.netloc.split(':')[0]


"Start checking %s for %s" % (module_name,test_url))

return self.is_cors_permissive(module_name,test_origin,test_url)

def test_https_trust_http(self):

module_name = inspect.stack()[0][3].replace('test_','');

test_url = self.url

parsed = urlparse(test_url)

if parsed.scheme != "https":


test_origin = "http://" + parsed.netloc.split(':')[0]


"Start checking %s for %s" % (module_name,test_url))

return self.is_cors_permissive(module_name,test_origin,test_url)

def test_custom_third_parties(self):

module_name = inspect.stack()[0][3].replace('test_','');

test_url = self.url

parsed = urlparse(test_url)

sld = tldextract.extract(test_url.strip()).registered_domain

domain = parsed.netloc.split(':')[0]


"Start checking %s for %s" % (module_name,test_url))

is_cors_perm = False

# Opening origins file

with open(os.path.join(os.path.dirname(os.path.realpath(__file__)),'..%sorigins.json' % os.sep)) as origins_file:

origins = json.load(origins_file)['origins']

for test_origin in origins:

is_cors_perm = self.is_cors_permissive(module_name,test_origin,test_url)

if is_cors_perm: break

return is_cors_perm

def test_special_characters_bypass(self):

module_name = inspect.stack()[0][3].replace('test_','');

test_url = self.url

parsed = urlparse(test_url)

special_characters = ['_','-','"','{','}','+','^','%60','!','~','`',';','|','&',"'",'(',')','*',',','$','=','+',"%0b"]

origins = []

for char in special_characters:

attempt = parsed.scheme + "://" + parsed.netloc.split(':')[0] + char + ".evil.com"


is_cors_perm = False


"Start checking %s for %s" % (module_name,test_url))

for test_origin in origins:

is_cors_perm = self.is_cors_permissive(module_name,test_origin,test_url)

if is_cors_perm: break

return is_cors_perm

def check_one_by_one(self):

functions = [












for fname in functions:

func = getattr(self,fname)

# Stop if we found a exploit case.

if(func()): break

return self.result

def check_all_in_parallel(self):

functions = [












threads = []

for fname in functions:

func = getattr(self,fname)

t = Thread(target=func)



for t in threads:


return self.all_results

import time

import json

import sys

class Log:

"""Class Log for logging CORS misconfiguration message"""

print_level = 0

msg_level = {0: 'DEBUG', 1: 'INFO', 2: 'WARNING', 3: 'ALERT'}

auto_timestamp = 1

def __init__(self, filename, print_level, auto_timestamp=1):

self.filename = filename

self.print_level = print_level

self.auto_timestamp = auto_timestamp

def write(self, msg, level=0, auto_timestamp=1):


if level >= self.print_level:

if self.auto_timestamp == 1:

timestamp = time.strftime("%Y-%m-%d %H:%M:%S",


record = "%s %s %s" % (timestamp, self.msg_level[level],


sys.stdout.write(record + "\r\n")


sys.stdout.write(msg + "\r\n")


except KeyboardInterrupt:


def debug(self, msg):

self.write(msg, 0)

def info(self, msg):

self.write(msg, 1)

def warning(self, msg):

record = "Found misconfiguration! " + json.dumps(msg)

self.write("""%s%s%s""" % ('\033[91m', record, '\033[0m'), 2)

def alert(self, msg):

self.write(msg, 3)

def close(self):

if self.log:


