|
@@ -11,7 +11,8 @@ from configparser import ConfigParser
|
|
|
|
|
|
import pandas as pd
|
|
|
|
|
|
-import libs.save_conf
|
|
|
+import utils.save_conf
|
|
|
+import utils.save_result
|
|
|
|
|
|
|
|
|
class Scheduling():
|
|
@@ -26,88 +27,93 @@ class Scheduling():
|
|
|
EXEC_LIMIT = 100000
|
|
|
|
|
|
# 静态数据 n:app数 N:inst数 m:machine数 k:资源种类
|
|
|
- n = N = m = 0
|
|
|
- k = 200
|
|
|
+ num_app = num_inst = num_mac = 0
|
|
|
+ num_k = 200
|
|
|
cpuIter = list()
|
|
|
appIndex = {}
|
|
|
machineIndex = {}
|
|
|
inst2AppIndex = {}
|
|
|
- appIndexference = {}
|
|
|
|
|
|
- apps = list()
|
|
|
- machines = list()
|
|
|
- appResources = list()
|
|
|
- machineResources = list()
|
|
|
+ # apps = list()
|
|
|
+ # machines = list()
|
|
|
+ appResources = list() # app表,
|
|
|
+ machineResources = list() # machine表
|
|
|
+ instanceDeploy = list() # instance表
|
|
|
+ appInterference = list() # app_interfence冲突表
|
|
|
|
|
|
# 动态数据
|
|
|
- inst2Machine = {}
|
|
|
+ inst2MachineRemine = {}
|
|
|
machineResourcesUsed = list()
|
|
|
- machineHasApp = pd.DataFrame(columns=list(["instanceid", "machineid"]), data=list())
|
|
|
+ machineHasApp = list() # 6000 [{}, {},{6004=1, 9126=1, 1598=1}, {}, {}, {},
|
|
|
+ inst2Machine = list()
|
|
|
+ result = pd.DataFrame(columns=list(["instanceid", "machineid"]), data=list())
|
|
|
|
|
|
def __init__(self, **kw):
|
|
|
+ '''
|
|
|
+ 初始化参数
|
|
|
+ :param kw:
|
|
|
+ '''
|
|
|
for k, v in kw.items():
|
|
|
setattr(self, k, v)
|
|
|
|
|
|
def loadData(self):
|
|
|
- ''' n
|
|
|
- app_resources.csv
|
|
|
- m
|
|
|
- machine_resources.csv
|
|
|
- N
|
|
|
- instance_deploy.csv
|
|
|
- iterference_cnt
|
|
|
- app_interference.csv
|
|
|
- judge framework
|
|
|
- '''
|
|
|
+ for i in range(self.T):
|
|
|
+ self.cpuIter.append(i)
|
|
|
app_interference, app_resources, instance_deploy, machine_resources = self.getConfig()
|
|
|
# 1.app_resources 9338*201
|
|
|
self.appResources = pd.read_csv(app_resources, header=None,
|
|
|
names=list(["appid", "cpu", "mem", "disk", "P", "M", "PM"]), encoding="utf-8")
|
|
|
-
|
|
|
tmp_cpu = self.appResources["cpu"].str.split('|', expand=True).astype('float')
|
|
|
tmp_mem = self.appResources["mem"].str.split('|', expand=True).astype('float')
|
|
|
-
|
|
|
for i in range(self.T):
|
|
|
- # 新添加98列CPU限制
|
|
|
- # self.appResources["cpu_" + str(i)] = None
|
|
|
- # self.appResources["mem_" + str(i)] = None
|
|
|
- # 赋值
|
|
|
self.appResources["cpu_" + str(i)] = tmp_cpu[i]
|
|
|
self.appResources["mem_" + str(i)] = tmp_mem[i]
|
|
|
-
|
|
|
# 去掉cpu/men两列
|
|
|
self.appResources.pop("cpu")
|
|
|
self.appResources.pop("mem")
|
|
|
- self.n, col = self.appResources.shape # 9338*201 201列:appid,cpu_1,cpu_2,...mem_1,men_2....,P,M,PM
|
|
|
+ self.num_app, col = self.appResources.shape # 9338*201 201列:appid,cpu_1,cpu_2,...mem_1,men_2....,P,M,PM
|
|
|
+ self.appResources["appid"] = pd.to_numeric(self.appResources["appid"].str.split("_", expand=True)[1].values)
|
|
|
|
|
|
# 2.machine_resources 6000*201
|
|
|
self.machineResources = pd.read_csv(machine_resources, header=None, names=list(
|
|
|
["machineid", "cpu", "mem", "disk", "P", "M", "PM"]), encoding="utf-8")
|
|
|
-
|
|
|
+ self.num_mac, col = self.machineResources.shape
|
|
|
for i in range(self.T):
|
|
|
self.machineResources["cpu_" + str(i)] = self.machineResources["cpu"]
|
|
|
self.machineResources["mem_" + str(i)] = self.machineResources["mem"]
|
|
|
self.machineResources.pop("cpu")
|
|
|
self.machineResources.pop("mem")
|
|
|
-
|
|
|
+ self.machineResources["machineid"] = pd.to_numeric(
|
|
|
+ self.machineResources["machineid"].str.split("_", expand=True)[1].values)
|
|
|
self.machineResourcesUsed = self.machineResources.copy()
|
|
|
-
|
|
|
for i in range(200):
|
|
|
self.machineResourcesUsed.iloc[:, i + 1] = 0
|
|
|
+ # 初始化 6000个空字典组成的list[{},{}....]
|
|
|
+ for i in range(self.num_mac):
|
|
|
+ self.machineHasApp.append({})
|
|
|
|
|
|
# 3.instance_deploy
|
|
|
- self.inst2Machine = pd.read_csv(instance_deploy, header=None,
|
|
|
- names=list(["instanceid", "appid", "machineid"]), encoding="utf-8")
|
|
|
+ self.instanceDeploy = pd.read_csv(instance_deploy, header=None,
|
|
|
+ names=list(["instanceid", "appid", "machineid"]), encoding="utf-8")
|
|
|
# 增加一个字段标注是否部署
|
|
|
- self.inst2Machine["isdeploy"] = False
|
|
|
- # # 4.app_interference 冲突表
|
|
|
- self.appIndexference = pd.read_csv(app_interference, header=None,
|
|
|
+ self.instanceDeploy["isdeploy"] = False
|
|
|
+ self.instanceDeploy["instanceid"] = pd.to_numeric(
|
|
|
+ self.instanceDeploy["instanceid"].str.split("_", expand=True)[1].values)
|
|
|
+ self.instanceDeploy["appid"] = pd.to_numeric(self.instanceDeploy["appid"].str.split("_", expand=True)[1].values)
|
|
|
+ self.instanceDeploy["machineid"] = pd.to_numeric(
|
|
|
+ self.instanceDeploy["machineid"].str.split("_", expand=True)[1].values)
|
|
|
+
|
|
|
+ # 4.app_interference 冲突表
|
|
|
+ self.appInterference = pd.read_csv(app_interference, header=None,
|
|
|
names=list(["appid1", "appid2", "max_interference"]), encoding="utf-8")
|
|
|
+ self.appInterference["appid1"] = pd.to_numeric(
|
|
|
+ self.appInterference["appid1"].str.split("_", expand=True)[1].values)
|
|
|
+ self.appInterference["appid2"] = pd.to_numeric(
|
|
|
+ self.appInterference["appid2"].str.split("_", expand=True)[1].values)
|
|
|
|
|
|
# instance按照磁盘消耗排序
|
|
|
- self.n, col = self.appResources.shape
|
|
|
- self.N, col = self.machineResources.shape
|
|
|
- self.m, col = self.inst2Machine.shape
|
|
|
+ self.num_app, col = self.appResources.shape
|
|
|
+ self.num_inst, col = self.instanceDeploy.shape
|
|
|
|
|
|
def getConfig(self):
|
|
|
'''
|
|
@@ -115,7 +121,8 @@ class Scheduling():
|
|
|
:return:
|
|
|
'''
|
|
|
# 生成配置文件
|
|
|
- self.init_conf()
|
|
|
+ # self.init_conf()
|
|
|
+
|
|
|
# 读取配置文件
|
|
|
cf = ConfigParser()
|
|
|
config_path = "../conf/config.ini"
|
|
@@ -133,7 +140,7 @@ class Scheduling():
|
|
|
初始化配置文件
|
|
|
:retur
|
|
|
'''
|
|
|
- libs.save_conf.write()
|
|
|
+ utils.save_conf.write()
|
|
|
|
|
|
def sort_dynamic(self):
|
|
|
print("ss")
|
|
@@ -143,23 +150,35 @@ class Scheduling():
|
|
|
先将instance从部署的主机中删除,删除一行,释放资源
|
|
|
:return:
|
|
|
'''
|
|
|
+ if instanceid not in self.inst2Machine["instance"]:
|
|
|
+ return
|
|
|
+ appid = self.inst2Machine[self.inst2Machine["instanceid"] == instanceid]["appid"].values[0]
|
|
|
+
|
|
|
+ # 更新machineResourcesUsed
|
|
|
+ for i in range(self.num_k):
|
|
|
+ machineResourcesUsed[fromMachine][j] -= appResources[appIt][i]
|
|
|
+ fromMachine = self.inst2AppIndex
|
|
|
self.inst2Machine.pop(instanceid)
|
|
|
|
|
|
- def toMachine(self, instanceid, machineid, doCheck=True):
|
|
|
+ def toMachine(self, instanceid, machineid, doCheck=False):
|
|
|
'''
|
|
|
检查互斥条件,然后把instance放入主机
|
|
|
+ :rtype: object
|
|
|
:param instanceid: 实例id
|
|
|
:param machineid: 主机id
|
|
|
:param doCheck: 是否检测资源限制
|
|
|
:return: True和False
|
|
|
'''
|
|
|
- appid = self.inst2Machine[self.inst2Machine["instanceid"] == instanceid]["appid"].values[0]
|
|
|
+ # instanceid所属的appid
|
|
|
+ appid = self.instanceDeploy[self.instanceDeploy["instanceid"] == instanceid]["appid"].values[0]
|
|
|
+ # machineid从1开始,而index从0开始
|
|
|
+ hasApp = self.machineHasApp[int(machineid - 1)]
|
|
|
if doCheck:
|
|
|
# 检查互斥
|
|
|
|
|
|
|
|
|
# 检查资源限制
|
|
|
- for i in range(self.k):
|
|
|
+ for i in range(self.num_k):
|
|
|
if (
|
|
|
self.machineResourcesUsed[self.machineResourcesUsed["machineid"] == machineid].iloc[:,
|
|
|
i + 1].values[0] +
|
|
@@ -175,56 +194,83 @@ class Scheduling():
|
|
|
i + 1].values[0])
|
|
|
# 如果不符合则 return False
|
|
|
return False
|
|
|
- # instance占用资源
|
|
|
- for i in range(self.k):
|
|
|
- self.machineResourcesUsed[self.machineResourcesUsed["machineid"] == machineid].iloc[:, i + 1].values[0] += \
|
|
|
+ # 将inst放入新的machine,占用资源
|
|
|
+ self.inst2Machine.append([{"instanceid": instanceid, "machineid": machineid}])
|
|
|
+ if appid not in hasApp:
|
|
|
+ hasApp.update({appid: 1})
|
|
|
+ else:
|
|
|
+ hasApp.update({appid: hasApp.get(appid) + 1})
|
|
|
+ for i in range(self.num_k):
|
|
|
+ self.machineResourcesUsed[self.machineResourcesUsed["machineid"] == machineid].iloc[:, i + 1].values[
|
|
|
+ 0] += \
|
|
|
self.appResources[self.appResources["appid"] == appid].iloc[:, i + 1].values[0]
|
|
|
return True
|
|
|
|
|
|
- def run(self):
|
|
|
+ def run(self, start):
|
|
|
'''
|
|
|
-
|
|
|
+ 执行部署
|
|
|
:return:
|
|
|
'''
|
|
|
# 已经部署的instance
|
|
|
- deployed_Instance = self.inst2Machine.loc[pd.isna(self.inst2Machine["machineid"]) == False]
|
|
|
+ deployed_Instance = self.instanceDeploy.loc[pd.isna(self.instanceDeploy["machineid"]) == False]
|
|
|
count_deployed_Instance, col = deployed_Instance.shape
|
|
|
- deployed_Instance = deployed_Instance.reset_index(drop=True)
|
|
|
+ deployed_Instance.reset_index(drop=True, inplace=True)
|
|
|
|
|
|
# 将已经部署的instance放置到对应主机中,占用相应资源,这一块代码比java慢了太多
|
|
|
for i in range(count_deployed_Instance):
|
|
|
- print(i)
|
|
|
instanceid = deployed_Instance["instanceid"][i]
|
|
|
machineid = deployed_Instance["machineid"][i]
|
|
|
self.toMachine(instanceid, machineid, doCheck=False)
|
|
|
+ print("初始部署第", i, "个,持续耗时", time.time() - start, "秒")
|
|
|
|
|
|
- # 先对已经部署的主机列表按照资源消耗进行排序
|
|
|
+ # 对instance同样按照disk消耗排序
|
|
|
+ self.instanceDeploy = self.instanceDeploy.sort_values(ascending=False, by="disk")
|
|
|
|
|
|
- # 先使用大主机,磁盘优先计算限制条件
|
|
|
- row1, col = self.inst2Machine.shape
|
|
|
+ # 然后通过ff方法,把instance放入machine中。每次放入instance后,队列删除,每次消耗主机i,删除主机i.先使用大主机,磁盘优先计算限制条件
|
|
|
+ row1, col = self.instanceDeploy.shape
|
|
|
while row1 > 0:
|
|
|
- # 每部署一次,消耗一个主机
|
|
|
- for row2 in self.inst2Machine.itertuples():
|
|
|
- if row2.
|
|
|
- self.toMachine(row2)
|
|
|
+ # 先对主机列表按照disk剩余进行排序,降序
|
|
|
+ self.machineResourcesUsed = self.machineResourcesUsed.sort_values(ascending=False, by="disk")
|
|
|
|
|
|
+ # 每部署一次,消耗一个主机
|
|
|
+ self.deployInstance()
|
|
|
# 筛选未部署的
|
|
|
- self.inst2Machine = self.inst2Machine[self.inst2Machine["isdeploy"] == False]
|
|
|
- row, col = self.inst2Machine.shape
|
|
|
- self.inst2Machine = self.inst2Machine.reset_index(drop=True)
|
|
|
+ self.instanceDeploy = self.instanceDeploy[self.instanceDeploy["isdeploy"] == False]
|
|
|
+ row, col = self.instanceDeploy.shape
|
|
|
+ self.instanceDeploy.reset_index(drop=True, inplace=True)
|
|
|
j = j + 1
|
|
|
|
|
|
print("已经部署:", 68219 - row, "剩余部署Instance数据:", row)
|
|
|
print("已经消耗Machine主机数据:", j)
|
|
|
- print("部署方案前几条示意:", self.machineHasApp.head())
|
|
|
- libs.save_result.save_result(self.machineHasApp)
|
|
|
+ print("部署方案前几条示意:", self.result.head())
|
|
|
+ utils.save_result.save_result(self.result)
|
|
|
+
|
|
|
+ def dcmp(self, x):
|
|
|
+ '''
|
|
|
+ 将结果映射到-1,0,1
|
|
|
+ :param x:
|
|
|
+ :return:
|
|
|
+ '''
|
|
|
+ if abs(x) < 1e-9:
|
|
|
+ return 0
|
|
|
+ elif x > 0:
|
|
|
+ return 1
|
|
|
+ else:
|
|
|
+ return -1
|
|
|
|
|
|
def deployInstance(self):
|
|
|
'''
|
|
|
部署逻辑
|
|
|
:return:
|
|
|
'''
|
|
|
- pass
|
|
|
+ # 主机:self.machineResourcesUsed["machineid"][0]
|
|
|
+ for row in self.instanceDeploy.itertuples():
|
|
|
+ i = row.Index
|
|
|
+ # 当前row实例尝试部署到新主机,如果可以部署则部署,如果初始已经部署,则算迁移,释放原来主机资源
|
|
|
+ if self.toMachine(row, machineid="", doCheck=True):
|
|
|
+ machineHasApp = machineHasApp.append(pd.DataFrame(
|
|
|
+ [{"instanceid": row.instanceid,
|
|
|
+ "machineid": "machine_" + str(j)}]))
|
|
|
|
|
|
|
|
|
if __name__ == '__main__':
|
|
@@ -234,7 +280,7 @@ if __name__ == '__main__':
|
|
|
# 加载数据
|
|
|
scheduling.loadData()
|
|
|
# 开始调度
|
|
|
- scheduling.run()
|
|
|
+ scheduling.run(start)
|
|
|
# 部署完事
|
|
|
print("------------部署完啦--------------")
|
|
|
end = time.time()
|