Browse Source

继续完善调度算法,尚未完成。

liuyuqi-dellpc 6 years ago
parent
commit
43278a0df8
11 changed files with 400 additions and 20 deletions
  1. 50 0
      code/app.py
  2. 4 4
      code/data_preview.py
  3. 60 0
      code/instance.py
  4. 33 0
      code/instance_deploy.py
  5. 21 5
      code/main.py
  6. 15 0
      code/plot.py
  7. 27 5
      code/save_conf.py
  8. 123 0
      code/sort_by_disk.py
  9. 30 6
      code/test_pandas.py
  10. 10 0
      twtech/__init__.py
  11. 27 0
      twtech/config.py

+ 50 - 0
code/app.py

@@ -0,0 +1,50 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+'''
+对app表处理,计算平均CPU,mem,在app表添加两列保存其值
+@Auther :liuyuqi.gov@msn.cn
+@Time :2018/7/7 3:14
+@File :app.py
+'''
+
+import matplotlib
+
+matplotlib.use('Agg')
+
+# 数据预览
+import pandas as pd
+from configparser import ConfigParser
+
+# step1: 数据参数初始化
+
+cf = ConfigParser()
+config_path = "../conf/config.ini"
+section_name = "data_file_name"
+cf.read(config_path)
+
+app_interference = cf.get(section_name, "app_interference")
+app_resources = cf.get(section_name, "app_resources")
+instance_deploy = cf.get(section_name, "instance_deploy")
+machine_resources = cf.get(section_name, "machine_resources")
+
+# app表
+df1 = pd.read_csv(app_resources, header=None,
+                  names=list(["appid", "cpu", "mem", "disk", "P", "M", "PM"]), encoding="utf-8")
+
+# 新添加两列
+df1["cpu_avg"] = None
+df1["mem_avg"] = None
+
+# expand=True表示
+tmp = df1["cpu"].str.split('|', expand=True).astype('float')
+# [9338 rows x 98 columns]
+df1["cpu_avg"] = tmp.T.mean().T  # 转置,求均值,再转置回来,这样求得一行的均值。
+
+tmp = df1["mem"].str.split('|', expand=True).astype('float')
+df1["mem_avg"] = tmp.T.mean().T  # 转置,求均值,再转置回来,这样求得一行的均值。
+print(df1.head())
+print("总共应用:", df1["appid"].unique().shape)
+
+df1.pop("cpu")
+df1.pop("mem")
+df1.to_csv("../data/app.csv")

+ 4 - 4
code/data_preview.py

@@ -94,11 +94,11 @@ def for_df3():
 
 
 def for_df4():
 def for_df4():
     # 主机和实例表。部署appid1的insterference最多可以部署n个appid2
     # 主机和实例表。部署appid1的insterference最多可以部署n个appid2
-    df = pd.read_csv(app_interference, header=None,
+    df4 = pd.read_csv(app_interference, header=None,
                      names=list(["appid1", "appid2", "max_interference"]), encoding="utf-8")
                      names=list(["appid1", "appid2", "max_interference"]), encoding="utf-8")
     # 查看数据类型
     # 查看数据类型
     # print(df.dtypes)
     # print(df.dtypes)
-    print("df数据大小:", df.shape)
+    print("df数据大小:", df4.shape)
 
 
     # 查看头尾部数据
     # 查看头尾部数据
     # app_8361  app_2163  0
     # app_8361  app_2163  0
@@ -121,9 +121,9 @@ def for_df4():
     # 第三列
     # 第三列
 
 
     # 描述性统计
     # 描述性统计
-    print("数据预览:", df.describe())
+    print("数据预览:", df4.describe())
 
 
-    plt.plot(df["max_interference"])
+    plt.plot(df4["max_interference"])
     plt.savefig("../submit/fig1.png")
     plt.savefig("../submit/fig1.png")
 
 
 
 

+ 60 - 0
code/instance.py

@@ -0,0 +1,60 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+'''
+按照app对instance分类
+@Auther :liuyuqi.gov@msn.cn
+@Time :2018/7/6 16:13
+@File :instance.py
+'''
+import matplotlib
+
+matplotlib.use('Agg')
+import pandas as pd
+from configparser import ConfigParser
+
+cf = ConfigParser()
+config_path = "../conf/config.ini"
+section_name = "data_file_name"
+cf.read(config_path)
+
+app_interference = cf.get(section_name, "app_interference")
+app_resources = cf.get(section_name, "app_resources")
+instance_deploy = cf.get(section_name, "instance_deploy")
+machine_resources = cf.get(section_name, "machine_resources")
+
+# app
+df1 = pd.read_csv(app_resources, header=None,
+                  names=list(["appid", "cpu", "mem", "disk", "P", "M", "PM"]), encoding="utf-8")
+
+# instance
+df3 = pd.read_csv(instance_deploy, header=None,
+                  names=list(["instanceid", "appid", "machineid"]))
+
+# instance分类统计
+group1 = df3.groupby("appid").count()
+print(type(group1))
+# print(group1["instanceid"].sort_values(ascending=False))
+# plt.plot(group1["instanceid"].sort_values(ascending=False))
+# plt.savefig("../submit/group1.jpg")
+
+# 找到每个instance消耗的disk
+
+df3["disk"] = None
+df3["mem"] = None
+df3["P"] = None
+df3["M"] = None
+df3["PM"] = None
+
+for i in range(0, int(cf.get("table_size", "instance_size"))):
+    # df1[df1["appid"] == df3["appid"][i]]["disk"]返回一个pd.Series对象(列表),其实只有一个值,需要选定第一个即可
+    df3["mem"][i] = df1[df1["appid"] == df3["appid"][i]]["mem"].values[0]
+    df3["cpu"][i] = df1[df1["appid"] == df3["appid"][i]]["cpu"].values[0]
+    df3["disk"][i] = df1[df1["appid"] == df3["appid"][i]]["disk"].values[0]
+    df3["P"][i] = df1[df1["appid"] == df3["appid"][i]]["P"].values[0]
+    df3["M"][i] = df1[df1["appid"] == df3["appid"][i]]["M"].values[0]
+    df3["PM"][i] = df1[df1["appid"] == df3["appid"][i]]["PM"].values[0]
+
+# ascending=False 降序
+df3.sort_values(ascending=False, by="disk")
+
+df3.to_csv("../data/instance.csv")

+ 33 - 0
code/instance_deploy.py

@@ -0,0 +1,33 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+'''
+@Auther :liuyuqi.gov@msn.cn
+@Time :2018/7/7 3:58
+@File :instance_deploy.py
+'''
+# 数据预览
+from configparser import ConfigParser
+
+import pandas as  pd
+
+# step1: 数据参数初始化
+
+cf = ConfigParser()
+config_path = "../conf/config.ini"
+section_name = "data_file_name"
+cf.read(config_path)
+
+app_interference = cf.get(section_name, "app_interference")
+app_resources = cf.get(section_name, "app_resources")
+instance_deploy = cf.get(section_name, "instance_deploy")
+machine_resources = cf.get(section_name, "machine_resources")
+
+df3 = pd.read_csv(instance_deploy, header=None,
+                  names=list(["instanceid", "appid", "machineid"]), encoding="utf-8")
+
+# print(df3[df3["machineid"] == "NaN"])
+# print(df3.head())
+
+print(pd.isna(df3["machineid"]).value_counts())
+# True     38223
+# False    29996 还有一半没有部署

+ 21 - 5
code/main.py

@@ -9,13 +9,32 @@ import os,sys
 import numpy as np,pandas as pd
 import numpy as np,pandas as pd
 import matplotlib.pyplot as plt
 import matplotlib.pyplot as plt
 
 
+# 数据预览
+import pandas as pd
+import matplotlib.pyplot as plt
+from configparser import ConfigParser
+
+# step1: 数据参数初始化
+
+cf = ConfigParser()
+config_path = "../conf/config.ini"
+section_name = "data_file_name"
+cf.read(config_path)
+
+app_interference = cf.get(section_name, "app_interference")
+app_resources = cf.get(section_name, "app_resources")
+instance_deploy = cf.get(section_name, "instance_deploy")
+machine_resources = cf.get(section_name, "machine_resources")
+
 #Wij矩阵表示第i个instance实例部署到j主机上
 #Wij矩阵表示第i个instance实例部署到j主机上
 Wij_size = np.zeros((68219, 6000))
 Wij_size = np.zeros((68219, 6000))
 Wij = np.zeros_like(Wij_size)
 Wij = np.zeros_like(Wij_size)
 
 
 def getWij():
 def getWij():
     # inst_26195, app_147, machine_1149
     # inst_26195, app_147, machine_1149
-    df3=pd.read_csv("../data/scheduling_preliminary_instance_deploy_20180606.csv", header=None,names=list(["instanceid", "appid", "machineid"]))
+    df3=pd.read_csv("../data/instance.csv", header=None,names=list(["instanceid", "appid", "machineid","disk"]))
+    df2 = pd.read_csv(machine_resources, header=None, names=list(
+        ["machineid", "cpu", "mem", "disk", "P", "M", "PM"]), encoding="utf-8")
     for i in range(0,68219):
     for i in range(0,68219):
             if df3[i]["machineid"]==None:
             if df3[i]["machineid"]==None:
                 pass
                 pass
@@ -23,9 +42,6 @@ def getWij():
                 # Wij[i][j]=
                 # Wij[i][j]=
                 pass
                 pass
 
 
-def import_data():
-    pass
 
 
 if __name__ == '__main__':
 if __name__ == '__main__':
-    getWij()
-
+    pass

+ 15 - 0
code/plot.py

@@ -0,0 +1,15 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+'''
+@Auther :liuyuqi.gov@msn.cn
+@Time :2018/7/6 16:59
+@File :plot.py
+'''
+import matplotlib
+matplotlib.use('Agg')
+import matplotlib.pyplot as plt  # 导入模块
+
+squares = [1, 4, 9, 16, 25]  # 指定列表Y坐标为列表中的值,X坐标为列表下标
+plt.plot(squares)  # 传入列表
+plt.show()
+plt.savefig("../submit/t1.jpg")

+ 27 - 5
code/save_conf.py

@@ -16,10 +16,31 @@ cf = ConfigParser()
 def write():
 def write():
     if not cf.has_section(section_name):
     if not cf.has_section(section_name):
         cf.add_section(section_name)
         cf.add_section(section_name)
-    cf.set(section_name, "app_interference", data_path+"scheduling_preliminary_app_interference_20180606.csv")
-    cf.set(section_name, "app_resources", data_path+"scheduling_preliminary_app_resources_20180606.csv")
-    cf.set(section_name, "instance_deploy", data_path+"duling_preliminary_instance_deploy_20180606.csv")
-    cf.set(section_name, "machine_resources", data_path+"scheduling_preliminary_machine_resources_20180606.csv")
+    cf.set(section_name, "app_interference", data_path + "scheduling_preliminary_app_interference_20180606.csv")
+    cf.set(section_name, "app_resources", data_path + "scheduling_preliminary_app_resources_20180606.csv")
+    cf.set(section_name, "instance_deploy", data_path + "scheduling_preliminary_instance_deploy_20180606.csv")
+    cf.set(section_name, "machine_resources", data_path + "scheduling_preliminary_machine_resources_20180606.csv")
+    cf.set(section_name, "instance", data_path + "instance.csv")
+    cf.set(section_name, "app", data_path + "app.csv")
+
+    if not cf.has_section("table_size"):
+        cf.add_section("table_size")
+    cf.set("table_size", "app_size", "9338")
+    cf.set("table_size", "machine_size", "6000")
+    cf.set("table_size", "instance_size", "68219")
+    cf.set("table_size", "app12_size", "35242")
+
+    if not cf.has_section("system_config"):
+        cf.add_section("system_config")
+    cf.set("system_config", "debug", "true")
+
+    if not cf.has_section("db_mysql"):
+        cf.add_section("db_mysql")
+    cf.set("db_mysql", "db_host", "localhost")
+    cf.set("db_mysql", "db_port", "3306")
+    cf.set("db_mysql", "db_user", "root")
+    cf.set("db_mysql", "db_pass", "1233456")
+
     with open(config_file, "w") as f:
     with open(config_file, "w") as f:
         cf.write(f)
         cf.write(f)
 
 
@@ -28,4 +49,5 @@ def read():
     cf.read(config_file)
     cf.read(config_file)
     print(cf.get(section_name, "app_interference"))
     print(cf.get(section_name, "app_interference"))
 
 
-write()
+
+write()

+ 123 - 0
code/sort_by_disk.py

@@ -0,0 +1,123 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+'''
+按照磁盘占用率从大到小装箱,即按照磁盘先用完为止进行分配实例到主机。
+@Auther :liuyuqi.gov@msn.cn
+@Time :2018/7/7 0:43
+@File :sort_by_disk.py
+'''
+
+import matplotlib
+
+matplotlib.use('Agg')
+import pandas as pd
+import matplotlib.pyplot as plt
+from configparser import ConfigParser
+
+cf = ConfigParser()
+config_path = "../conf/config.ini"
+section_name = "data_file_name"
+cf.read(config_path)
+
+app_interference = cf.get(section_name, "app_interference")
+app_resources = cf.get(section_name, "app_resources")
+instance_deploy = cf.get(section_name, "instance_deploy")
+machine_resources = cf.get(section_name, "machine_resources")
+app = cf.get(section_name, "app")
+instance = cf.get(section_name, "instance")
+# app
+df1 = pd.read_csv(app_resources, encoding="utf-8")
+
+# instance
+df3 = pd.read_csv(instance_deploy, header=None,
+                  names=list(["instanceid", "appid", "machineid"]))
+
+# machine
+# 其实就两类,所以就不需要导入数据了。
+
+# 限制表
+df4 = pd.read_csv(app_interference, header=None,
+                  names=list(["appid1", "appid2", "max_interference"]), encoding="utf-8")
+
+result = pd.DataFrame(columns=list(["instanceid"], "machineid"))
+
+tem_disk = tem_mem = tem_cpu = tem_P = tem_M = tem_PM = 0
+tmp_stand_cpu1 = 32
+tmp_stand_mem1 = 64
+tmp_stand_disk1 = 600
+
+tmp_stand_cpu2 = 92
+tmp_stand_mem2 = 288
+tmp_stand_disk2 = 600
+
+tmp_stand_P = 7
+tmp_stand_M1 = 3
+tmp_stand_M2 = 7
+tmp_stand_PM1 = 7
+tmp_stand_PM2 = 9
+
+machine_count = 0  # 3000小机器,3000大机器。所以在小机器用完换大机器
+j = 1  # j表示主机序号,从1-3000,3001到6000
+is_deploy = False  # 主机j是否部署了instance
+deploy_list = list()  # 主机j部署的instanceid实例
+
+
+# 各app之间的限制
+def restrictApp(instance, deploy_list):
+    # df4["appid1"]
+    # df4["appid2"]
+
+    return True
+
+
+# 执行部署方案
+def deplay():
+    mlength = df3["instanceid"].size()
+    while mlength > 0:
+        deployInstance(mlength)
+
+    result.to_csv("../submit/xx.csv")
+
+
+def deployInstance(mlength):
+    for i in range(0, mlength):
+        tem_disk = tem_disk + df3["disk"][i]  # 当前磁盘消耗
+        tem_mem = tem_mem + df3["mem"][i]
+        tem_cpu = tem_cpu + df3["cpu"][i]
+        tem_P = tem_P + df3["P"][i]
+        tem_M = tem_M + df3["M"][i]
+        tem_PM = tem_PM + df3["PM"][i]
+
+        if tem_disk < tmp_stand_disk1:  # 磁盘够
+            # if 满足限制表条件,则把当前实例部署到这台主机上。
+            if is_deploy == True:
+                if restrictApp(instance=df3["instanceid"], deploy_list=deploy_list):
+                    if tem_mem < tmp_stand_mem1:  # 内存够
+                        if tem_cpu < tmp_stand_cpu1:  # CPU够
+                            if tem_M < tmp_stand_M1:
+                                if tem_P < tmp_stand_P:
+                                    if tem_PM < tmp_stand_PM1:
+                                        result["machine"][i] = "machine_" + i
+            else:
+                # 主机j没有部署实例,则先部署一个
+                result["machine"][i] = "machine_" + i
+                is_deploy = True
+    # 整个instace都遍历了,第j主机无法再放入一个,所以添加j+1主机
+    j = j + 1
+
+
+def plotGroup():  # df3新建一列
+    df3["disk"] = None
+    for i in range(0, 68219):
+        df3["disk"][i] = lambda x: x[i], df1["disk"]
+
+    # instance分类统计
+    group1 = df3.groupby("appid").count()
+    print(type(group1))
+    print(group1["instanceid"].sort_values(ascending=False))
+    plt.plot(group1["instanceid"].sort_values(ascending=False))
+    plt.savefig("../submit/group1.jpg")
+
+    # 找到每个instance消耗的disk
+
+    # df3["disk"] =

+ 30 - 6
code/test_pandas.py

@@ -5,8 +5,7 @@
 @Time :2018/7/5 3:08
 @Time :2018/7/5 3:08
 @File :test_pandas.py
 @File :test_pandas.py
 '''
 '''
-import pandas as pd ,numpy as np
-
+import pandas as pd
 
 
 def t1():
 def t1():
     a = [['a', '1.2', '4.2'], ['b', '70', '0.03'], ['x', '5', '0']]
     a = [['a', '1.2', '4.2'], ['b', '70', '0.03'], ['x', '5', '0']]
@@ -14,20 +13,45 @@ def t1():
     print(df.dtypes)
     print(df.dtypes)
     print(df)
     print(df)
 
 
+
 def t2():
 def t2():
     obj = pd.Series(list('cadaabbcc'))
     obj = pd.Series(list('cadaabbcc'))
     uniques = obj.unique()
     uniques = obj.unique()
     print(obj.dtypes)
     print(obj.dtypes)
     print(uniques.shape)
     print(uniques.shape)
 
 
+
 def t3():
 def t3():
-    df=pd.DataFrame()
-    df2=pd.read_csv()
-    df3=pd.Series()
+    df = pd.DataFrame()
+    df2 = pd.read_csv()
+    df3 = pd.Series()
     pd.concat()
     pd.concat()
     pd.to_datetime()
     pd.to_datetime()
     pd.merge()
     pd.merge()
     pd.Timestamp
     pd.Timestamp
 
 
 
 
-t2()
+def t4():
+    df = pd.DataFrame(columns=list("AB"), data=[[1, 2], [3, 4]])
+    df["C"] = None
+    df["C"][1] = 2
+    print(df)
+
+
+def t5():
+    ser1 = pd.Series([1, 2, 3, 4])
+    ser2 = pd.Series(range(4), index=["a", "b", "c", "d"])
+    sdata = {'Ohio': 35000, 'Texas': 71000, 'Oregon': 16000, 'Utah': 5000}
+    ser3 = pd.Series(sdata)
+    # print(ser1)
+    print(ser2)
+
+    # 访问Series
+    ser2["a"]
+    # 所有索引
+    ser2.index
+    # 所有值
+    ser2.values
+
+
+t5()

+ 10 - 0
twtech/__init__.py

@@ -0,0 +1,10 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+'''
+@Auther :liuyuqi.gov@msn.cn
+@Time :2018/7/7 3:04
+@File :__init__.py.py
+'''
+
+if __name__ == '__main__':
+    pass

+ 27 - 0
twtech/config.py

@@ -0,0 +1,27 @@
+#!/usr/bin/env python
+# -*- coding: utf-8 -*-
+'''
+@Auther :liuyuqi.gov@msn.cn
+@Time :2018/7/7 3:04
+@File :config.py
+'''
+from configparser import ConfigParser
+
+cf = ConfigParser()
+config_path = "../conf/config.ini"
+section_name = "data_file_name"
+cf.read(config_path)
+
+
+class Config():
+    def __init__(self):
+        pass
+
+    def getConfig(self):
+        return self
+
+    def setConfig(self, db_mysql, sysconfig, file):
+        pass
+
+    def setConfigByDB(self, db_mysql):
+        self.db_mysql = ""