main.py 8.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241
  1. #!/usr/bin/env python
  2. # -*- coding: utf-8 -*-
  3. '''
  4. @Auther :liuyuqi.gov@msn.cn
  5. @Time :2018/7/4 15:55
  6. @File :main.py
  7. '''
  8. import time
  9. from configparser import ConfigParser
  10. import pandas as pd
  11. import libs.save_conf
  12. class Scheduling():
  13. '''
  14. 调度
  15. '''
  16. # 参数
  17. alpha = 10
  18. beta = 0.5
  19. T = 98
  20. EXEC_LIMIT = 100000
  21. # 静态数据 n:app数 N:inst数 m:machine数 k:资源种类
  22. n = N = m = 0
  23. k = 200
  24. cpuIter = list()
  25. appIndex = {}
  26. machineIndex = {}
  27. inst2AppIndex = {}
  28. appIndexference = {}
  29. apps = list()
  30. machines = list()
  31. appResources = list()
  32. machineResources = list()
  33. # 动态数据
  34. inst2Machine = {}
  35. machineResourcesUsed = list()
  36. machineHasApp = pd.DataFrame(columns=list(["instanceid", "machineid"]), data=list())
  37. def __init__(self, **kw):
  38. for k, v in kw.items():
  39. setattr(self, k, v)
  40. def loadData(self):
  41. ''' n
  42. app_resources.csv
  43. m
  44. machine_resources.csv
  45. N
  46. instance_deploy.csv
  47. iterference_cnt
  48. app_interference.csv
  49. judge framework
  50. '''
  51. app_interference, app_resources, instance_deploy, machine_resources = self.getConfig()
  52. # 1.app_resources 9338*201
  53. self.appResources = pd.read_csv(app_resources, header=None,
  54. names=list(["appid", "cpu", "mem", "disk", "P", "M", "PM"]), encoding="utf-8")
  55. tmp_cpu = self.appResources["cpu"].str.split('|', expand=True).astype('float')
  56. tmp_mem = self.appResources["mem"].str.split('|', expand=True).astype('float')
  57. for i in range(self.T):
  58. # 新添加98列CPU限制
  59. # self.appResources["cpu_" + str(i)] = None
  60. # self.appResources["mem_" + str(i)] = None
  61. # 赋值
  62. self.appResources["cpu_" + str(i)] = tmp_cpu[i]
  63. self.appResources["mem_" + str(i)] = tmp_mem[i]
  64. # 去掉cpu/men两列
  65. self.appResources.pop("cpu")
  66. self.appResources.pop("mem")
  67. self.n, col = self.appResources.shape # 9338*201 201列:appid,cpu_1,cpu_2,...mem_1,men_2....,P,M,PM
  68. # 2.machine_resources 6000*201
  69. self.machineResources = pd.read_csv(machine_resources, header=None, names=list(
  70. ["machineid", "cpu", "mem", "disk", "P", "M", "PM"]), encoding="utf-8")
  71. for i in range(self.T):
  72. self.machineResources["cpu_" + str(i)] = self.machineResources["cpu"]
  73. self.machineResources["mem_" + str(i)] = self.machineResources["mem"]
  74. self.machineResources.pop("cpu")
  75. self.machineResources.pop("mem")
  76. self.machineResourcesUsed = self.machineResources.copy()
  77. for i in range(200):
  78. self.machineResourcesUsed.iloc[:, i + 1] = 0
  79. # 3.instance_deploy
  80. self.inst2Machine = pd.read_csv(instance_deploy, header=None,
  81. names=list(["instanceid", "appid", "machineid"]), encoding="utf-8")
  82. # 增加一个字段标注是否部署
  83. self.inst2Machine["isdeploy"] = False
  84. # # 4.app_interference 冲突表
  85. self.appIndexference = pd.read_csv(app_interference, header=None,
  86. names=list(["appid1", "appid2", "max_interference"]), encoding="utf-8")
  87. # instance按照磁盘消耗排序
  88. self.n, col = self.appResources.shape
  89. self.N, col = self.machineResources.shape
  90. self.m, col = self.inst2Machine.shape
  91. def getConfig(self):
  92. '''
  93. step1: 数据参数初始化
  94. :return:
  95. '''
  96. # 生成配置文件
  97. self.init_conf()
  98. # 读取配置文件
  99. cf = ConfigParser()
  100. config_path = "../conf/config.ini"
  101. section_name = "data_file_name"
  102. cf.read(config_path)
  103. app_interference = cf.get(section_name, "app_interference")
  104. app_resources = cf.get(section_name, "app_resources")
  105. instance_deploy = cf.get(section_name, "instance_deploy")
  106. machine_resources = cf.get(section_name, "machine_resources")
  107. return app_interference, app_resources, instance_deploy, machine_resources
  108. def init_conf(self):
  109. '''
  110. 初始化配置文件
  111. :retur
  112. '''
  113. libs.save_conf.write()
  114. def sort_dynamic(self):
  115. print("ss")
  116. def pickInstance(self, instanceid):
  117. '''
  118. 先将instance从部署的主机中删除,删除一行,释放资源
  119. :return:
  120. '''
  121. self.inst2Machine.pop(instanceid)
  122. def toMachine(self, instanceid, machineid, doCheck=True):
  123. '''
  124. 检查互斥条件,然后把instance放入主机
  125. :param instanceid: 实例id
  126. :param machineid: 主机id
  127. :param doCheck: 是否检测资源限制
  128. :return: True和False
  129. '''
  130. appid = self.inst2Machine[self.inst2Machine["instanceid"] == instanceid]["appid"].values[0]
  131. if doCheck:
  132. # 检查互斥
  133. # 检查资源限制
  134. for i in range(self.k):
  135. if (
  136. self.machineResourcesUsed[self.machineResourcesUsed["machineid"] == machineid].iloc[:,
  137. i + 1].values[0] +
  138. self.appResources[self.appResources["appid"] == appid].iloc[:, i + 1].values[0]
  139. >
  140. self.machineResources[self.machineResources["machineid"] == machineid].iloc[:,
  141. i + 1].values[0]):
  142. print("Resource Limit: instance: ", instanceid, ",", "machine:", machineid,
  143. self.machineResourcesUsed[self.machineResourcesUsed["machineid"] == machineid].iloc[:,
  144. i + 1].values[0], "+",
  145. self.appResources[self.appResources["appid"] == appid].iloc[:, i + 1].values[0], " >",
  146. self.machineResources[self.machineResources["machineid"] == machineid].iloc[:,
  147. i + 1].values[0])
  148. # 如果不符合则 return False
  149. return False
  150. # instance占用资源
  151. for i in range(self.k):
  152. self.machineResourcesUsed[self.machineResourcesUsed["machineid"] == machineid].iloc[:, i + 1].values[0] += \
  153. self.appResources[self.appResources["appid"] == appid].iloc[:, i + 1].values[0]
  154. return True
  155. def run(self):
  156. '''
  157. :return:
  158. '''
  159. # 已经部署的instance
  160. deployed_Instance = self.inst2Machine.loc[pd.isna(self.inst2Machine["machineid"]) == False]
  161. count_deployed_Instance, col = deployed_Instance.shape
  162. deployed_Instance = deployed_Instance.reset_index(drop=True)
  163. # 将已经部署的instance放置到对应主机中,占用相应资源,这一块代码比java慢了太多
  164. for i in range(count_deployed_Instance):
  165. print(i)
  166. instanceid = deployed_Instance["instanceid"][i]
  167. machineid = deployed_Instance["machineid"][i]
  168. self.toMachine(instanceid, machineid, doCheck=False)
  169. # 先对已经部署的主机列表按照资源消耗进行排序
  170. # 先使用大主机,磁盘优先计算限制条件
  171. row1, col = self.inst2Machine.shape
  172. while row1 > 0:
  173. # 每部署一次,消耗一个主机
  174. for row2 in self.inst2Machine.itertuples():
  175. if row2.
  176. self.toMachine(row2)
  177. # 筛选未部署的
  178. self.inst2Machine = self.inst2Machine[self.inst2Machine["isdeploy"] == False]
  179. row, col = self.inst2Machine.shape
  180. self.inst2Machine = self.inst2Machine.reset_index(drop=True)
  181. j = j + 1
  182. print("已经部署:", 68219 - row, "剩余部署Instance数据:", row)
  183. print("已经消耗Machine主机数据:", j)
  184. print("部署方案前几条示意:", self.machineHasApp.head())
  185. libs.save_result.save_result(self.machineHasApp)
  186. def deployInstance(self):
  187. '''
  188. 部署逻辑
  189. :return:
  190. '''
  191. pass
  192. if __name__ == '__main__':
  193. print("------------开始部署啦--------------")
  194. start = time.time()
  195. scheduling = Scheduling()
  196. # 加载数据
  197. scheduling.loadData()
  198. # 开始调度
  199. scheduling.run()
  200. # 部署完事
  201. print("------------部署完啦--------------")
  202. end = time.time()
  203. print("总共耗时:", end - start, "秒")