123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288 |
- #!/usr/bin/env python
- # -*- coding: utf-8 -*-
- '''
- @Auther :liuyuqi.gov@msn.cn
- @Time :2018/7/4 15:55
- @File :main.py
- '''
- import time
- from configparser import ConfigParser
- import pandas as pd
- import utils.save_conf
- import utils.save_result
- class Scheduling():
- '''
- 调度
- '''
- # 参数
- alpha = 10
- beta = 0.5
- T = 98
- EXEC_LIMIT = 100000
- # 静态数据 n:app数 N:inst数 m:machine数 k:资源种类
- num_app = num_inst = num_mac = 0
- num_k = 200
- cpuIter = list()
- appIndex = {}
- machineIndex = {}
- inst2AppIndex = {}
- # apps = list()
- # machines = list()
- appResources = list() # app表,
- machineResources = list() # machine表
- instanceDeploy = list() # instance表
- appInterference = list() # app_interfence冲突表
- # 动态数据
- inst2MachineRemine = {}
- machineResourcesUsed = list()
- machineHasApp = list() # 6000 [{}, {},{6004=1, 9126=1, 1598=1}, {}, {}, {},
- inst2Machine = list()
- result = pd.DataFrame(columns=list(["instanceid", "machineid"]), data=list())
- def __init__(self, **kw):
- '''
- 初始化参数
- :param kw:
- '''
- for k, v in kw.items():
- setattr(self, k, v)
- def loadData(self):
- for i in range(self.T):
- self.cpuIter.append(i)
- app_interference, app_resources, instance_deploy, machine_resources = self.getConfig()
- # 1.app_resources 9338*201
- self.appResources = pd.read_csv(app_resources, header=None,
- names=list(["appid", "cpu", "mem", "disk", "P", "M", "PM"]), encoding="utf-8")
- tmp_cpu = self.appResources["cpu"].str.split('|', expand=True).astype('float')
- tmp_mem = self.appResources["mem"].str.split('|', expand=True).astype('float')
- for i in range(self.T):
- self.appResources["cpu_" + str(i)] = tmp_cpu[i]
- self.appResources["mem_" + str(i)] = tmp_mem[i]
- # 去掉cpu/men两列
- self.appResources.pop("cpu")
- self.appResources.pop("mem")
- self.num_app, col = self.appResources.shape # 9338*201 201列:appid,cpu_1,cpu_2,...mem_1,men_2....,P,M,PM
- self.appResources["appid"] = pd.to_numeric(self.appResources["appid"].str.split("_", expand=True)[1].values)
- # 2.machine_resources 6000*201
- self.machineResources = pd.read_csv(machine_resources, header=None, names=list(
- ["machineid", "cpu", "mem", "disk", "P", "M", "PM"]), encoding="utf-8")
- self.num_mac, col = self.machineResources.shape
- for i in range(self.T):
- self.machineResources["cpu_" + str(i)] = self.machineResources["cpu"]
- self.machineResources["mem_" + str(i)] = self.machineResources["mem"]
- self.machineResources.pop("cpu")
- self.machineResources.pop("mem")
- self.machineResources["machineid"] = pd.to_numeric(
- self.machineResources["machineid"].str.split("_", expand=True)[1].values)
- self.machineResourcesUsed = self.machineResources.copy()
- for i in range(200):
- self.machineResourcesUsed.iloc[:, i + 1] = 0
- # 初始化 6000个空字典组成的list[{},{}....]
- for i in range(self.num_mac):
- self.machineHasApp.append({})
- # 3.instance_deploy
- self.instanceDeploy = pd.read_csv(instance_deploy, header=None,
- names=list(["instanceid", "appid", "machineid"]), encoding="utf-8")
- # 增加一个字段标注是否部署
- self.instanceDeploy["isdeploy"] = False
- self.instanceDeploy["instanceid"] = pd.to_numeric(
- self.instanceDeploy["instanceid"].str.split("_", expand=True)[1].values)
- self.instanceDeploy["appid"] = pd.to_numeric(self.instanceDeploy["appid"].str.split("_", expand=True)[1].values)
- self.instanceDeploy["machineid"] = pd.to_numeric(
- self.instanceDeploy["machineid"].str.split("_", expand=True)[1].values)
- # 4.app_interference 冲突表
- self.appInterference = pd.read_csv(app_interference, header=None,
- names=list(["appid1", "appid2", "max_interference"]), encoding="utf-8")
- self.appInterference["appid1"] = pd.to_numeric(
- self.appInterference["appid1"].str.split("_", expand=True)[1].values)
- self.appInterference["appid2"] = pd.to_numeric(
- self.appInterference["appid2"].str.split("_", expand=True)[1].values)
- # instance按照磁盘消耗排序
- self.num_app, col = self.appResources.shape
- self.num_inst, col = self.instanceDeploy.shape
- def getConfig(self):
- '''
- step1: 数据参数初始化
- :return:
- '''
- # 生成配置文件
- # self.init_conf()
- # 读取配置文件
- cf = ConfigParser()
- config_path = "../conf/config.ini"
- section_name = "data_file_name"
- cf.read(config_path)
- app_interference = cf.get(section_name, "app_interference")
- app_resources = cf.get(section_name, "app_resources")
- instance_deploy = cf.get(section_name, "instance_deploy")
- machine_resources = cf.get(section_name, "machine_resources")
- return app_interference, app_resources, instance_deploy, machine_resources
- def init_conf(self):
- '''
- 初始化配置文件
- :retur
- '''
- utils.save_conf.write()
- def sort_dynamic(self):
- print("ss")
- def pickInstance(self, instanceid):
- '''
- 先将instance从部署的主机中删除,删除一行,释放资源
- :return:
- '''
- if instanceid not in self.inst2Machine["instance"]:
- return
- appid = self.inst2Machine[self.inst2Machine["instanceid"] == instanceid]["appid"].values[0]
- # 更新machineResourcesUsed
- for i in range(self.num_k):
- machineResourcesUsed[fromMachine][j] -= appResources[appIt][i]
- fromMachine = self.inst2AppIndex
- self.inst2Machine.pop(instanceid)
- def toMachine(self, instanceid, machineid, doCheck=False):
- '''
- 检查互斥条件,然后把instance放入主机
- :rtype: object
- :param instanceid: 实例id
- :param machineid: 主机id
- :param doCheck: 是否检测资源限制
- :return: True和False
- '''
- # instanceid所属的appid
- appid = self.instanceDeploy[self.instanceDeploy["instanceid"] == instanceid]["appid"].values[0]
- # machineid从1开始,而index从0开始
- hasApp = self.machineHasApp[int(machineid - 1)]
- if doCheck:
- # 检查互斥
- # 检查资源限制
- for i in range(self.num_k):
- if (
- self.machineResourcesUsed[self.machineResourcesUsed["machineid"] == machineid].iloc[:,
- i + 1].values[0] +
- self.appResources[self.appResources["appid"] == appid].iloc[:, i + 1].values[0]
- >
- self.machineResources[self.machineResources["machineid"] == machineid].iloc[:,
- i + 1].values[0]):
- print("Resource Limit: instance: ", instanceid, ",", "machine:", machineid,
- self.machineResourcesUsed[self.machineResourcesUsed["machineid"] == machineid].iloc[:,
- i + 1].values[0], "+",
- self.appResources[self.appResources["appid"] == appid].iloc[:, i + 1].values[0], " >",
- self.machineResources[self.machineResources["machineid"] == machineid].iloc[:,
- i + 1].values[0])
- # 如果不符合则 return False
- return False
- # 将inst放入新的machine,占用资源
- self.inst2Machine.append([{"instanceid": instanceid, "machineid": machineid}])
- if appid not in hasApp:
- hasApp.update({appid: 1})
- else:
- hasApp.update({appid: hasApp.get(appid) + 1})
- for i in range(self.num_k):
- self.machineResourcesUsed[self.machineResourcesUsed["machineid"] == machineid].iloc[:, i + 1].values[
- 0] += \
- self.appResources[self.appResources["appid"] == appid].iloc[:, i + 1].values[0]
- return True
- def run(self, start):
- '''
- 执行部署
- :return:
- '''
- # 已经部署的instance
- deployed_Instance = self.instanceDeploy.loc[pd.isna(self.instanceDeploy["machineid"]) == False]
- count_deployed_Instance, col = deployed_Instance.shape
- deployed_Instance.reset_index(drop=True, inplace=True)
- # 将已经部署的instance放置到对应主机中,占用相应资源,这一块代码比java慢了太多
- for i in range(count_deployed_Instance):
- instanceid = deployed_Instance["instanceid"][i]
- machineid = deployed_Instance["machineid"][i]
- self.toMachine(instanceid, machineid, doCheck=False)
- print("初始部署第", i, "个,持续耗时", time.time() - start, "秒")
- # 对instance同样按照disk消耗排序
- self.instanceDeploy = self.instanceDeploy.sort_values(ascending=False, by="disk")
- # 然后通过ff方法,把instance放入machine中。每次放入instance后,队列删除,每次消耗主机i,删除主机i.先使用大主机,磁盘优先计算限制条件
- row1, col = self.instanceDeploy.shape
- while row1 > 0:
- # 先对主机列表按照disk剩余进行排序,降序
- self.machineResourcesUsed = self.machineResourcesUsed.sort_values(ascending=False, by="disk")
- # 每部署一次,消耗一个主机
- self.deployInstance()
- # 筛选未部署的
- self.instanceDeploy = self.instanceDeploy[self.instanceDeploy["isdeploy"] == False]
- row, col = self.instanceDeploy.shape
- self.instanceDeploy.reset_index(drop=True, inplace=True)
- j = j + 1
- print("已经部署:", 68219 - row, "剩余部署Instance数据:", row)
- print("已经消耗Machine主机数据:", j)
- print("部署方案前几条示意:", self.result.head())
- utils.save_result.save_result(self.result)
- def dcmp(self, x):
- '''
- 将结果映射到-1,0,1
- :param x:
- :return:
- '''
- if abs(x) < 1e-9:
- return 0
- elif x > 0:
- return 1
- else:
- return -1
- def deployInstance(self):
- '''
- 部署逻辑
- :return:
- '''
- # 主机:self.machineResourcesUsed["machineid"][0]
- for row in self.instanceDeploy.itertuples():
- i = row.Index
- # 当前row实例尝试部署到新主机,如果可以部署则部署,如果初始已经部署,则算迁移,释放原来主机资源
- if self.toMachine(row, machineid="", doCheck=True):
- machineHasApp = machineHasApp.append(pd.DataFrame(
- [{"instanceid": row.instanceid,
- "machineid": "machine_" + str(j)}]))
- if __name__ == '__main__':
- print("------------开始部署啦--------------")
- start = time.time()
- scheduling = Scheduling()
- # 加载数据
- scheduling.loadData()
- print("加载数据耗时:", time.time() - start)
- # 开始调度
- scheduling.run(start)
- # 部署完事
- print("------------部署完啦--------------")
- end = time.time()
- print("总共耗时:", end - start, "秒")
|