datatool.py 5.35 KB
Newer Older
jose's avatar
jose committed

# coding: utf-8
# -------------------------------------------------------------------
# 宝塔Linux面板
# -------------------------------------------------------------------
# Copyright (c) 2015-2017 宝塔软件(http:#bt.cn) All rights reserved.
# -------------------------------------------------------------------
# Author: 1249648969@qq.com
# -------------------------------------------------------------------

# ------------------------------
# 数据库工具类
# ------------------------------
import sys, os
os.chdir("/www/server/panel")
sys.path.append('class')
import panelMysql
import re,json,public

class datatools:
    DB_MySQL = None
    # 字节单位转换
    def ToSize(self, size):
        ds = ['b', 'KB', 'MB', 'GB', 'TB']
        for d in ds:
            if size < 1024: return ('%.2f' % size) + d
            size = size / 1024
        return '0b';

    # 获取当前数据库信息
    def GetdataInfo(self,get):
        '''
        传递一个数据库名称即可 get.databases
        '''
        if not self.DB_MySQL:self.DB_MySQL = panelMysql.panelMysql()
        db_name=get.db_name

        if not db_name:return False
        ret = {}
        tables = self.map_to_list(self.DB_MySQL.query('show tables from `%s`' % db_name))
        if type(tables) == list:
            try:
                data = self.map_to_list(self.DB_MySQL.query("select sum(DATA_LENGTH)+sum(INDEX_LENGTH) from information_schema.tables  where table_schema='%s'" % db_name))[0][0]
            except:
                data=0

            if not data: data = 0
            ret['data_size'] = self.ToSize(data)
            ret['database'] = db_name

            ret3 = []
            for i in tables:
                if i == 1049: return public.returnMsg(False,'DB_NOT_EXIST')
                table = self.map_to_list(self.DB_MySQL.query("show table status from `%s` where name = '%s'" % (db_name, i[0])))
                if not table: continue
                try:
                    ret2 = {}
                    ret2['type']=table[0][1]
                    data_size = table[0][6]
                    ret2['rows_count'] = table[0][4]
                    ret2['collation'] = table[0][14]
                    ret2['data_size'] = self.ToSize(int(data_size))
                    ret2['table_name'] = i[0]
                    ret3.append(ret2)
                except: continue
            ret['tables'] = (ret3)
        return ret



    #修复表信息
    def RepairTable(self,get):
        
        '''
        POST:
        db_name=web
        tables=['web1','web2']
        '''
        db_name = get.db_name
        tables = json.loads(get.tables)
        if not db_name or not tables: return False
        if not self.DB_MySQL:self.DB_MySQL = panelMysql.panelMysql()
        mysql_table = self.map_to_list(self.DB_MySQL.query('show tables from `%s`' % db_name))
        ret=[]
        if type(mysql_table)==list:
            if len(mysql_table)>0:
                for i in mysql_table:
                    for i2 in tables:
                        if i2==i[0]:
                            ret.append(i2)
                if len(ret)>0:
                    for i in ret:
                        self.DB_MySQL.execute('REPAIR TABLE `%s`.`%s`'%(db_name,i))
                    return True 
        return False



    #map to list
    def map_to_list(self,map_obj):
        try:
            if type(map_obj) != list and type(map_obj) != str: map_obj = list(map_obj)
            return map_obj
        except: return []


    # 优化表
    def OptimizeTable(self,get):
        '''
        POST:
        db_name=web
        tables=['web1','web2']
        '''
        if not self.DB_MySQL:self.DB_MySQL = panelMysql.panelMysql()
        db_name = get.db_name
        tables = json.loads(get.tables)
        if not db_name or not tables: return False
        mysql_table = self.map_to_list(self.DB_MySQL.query('show tables from `%s`' % db_name))
        ret=[]
        if type(mysql_table) == list:
            if len(mysql_table) > 0:
                for i in mysql_table:
                    for i2 in tables:
                        if i2 == i[0]:
                            ret.append(i2)
                if len(ret)>0:
                    for i in ret:
                        self.DB_MySQL.execute('OPTIMIZE table `%s`.`%s` ENGINE=MyISAM' % (db_name,i))
                    return True 
        return False

    # 更改表引擎
    def AlterTable(self,get):
        '''
        POST:
        db_name=web
        table_type=innodb
        tables=['web1','web2']
        '''
        if not self.DB_MySQL:self.DB_MySQL = panelMysql.panelMysql()
        db_name = get.db_name
        table_type = get.table_type
        tables = json.loads(get.tables)

        if not db_name or not tables: return False
        
        mysql_table = self.map_to_list(self.DB_MySQL.query('show tables from `%s`' % db_name))
        ret=[]
        if type(mysql_table)==list:
            if len(mysql_table)>0:
                for i in mysql_table:
                    for i2 in tables:
                        if i2==i[0]:
                            ret.append(i2)
                if len(ret)>0:
                    for i in ret:
                        self.DB_MySQL.execute('alter table `%s`.`%s` ENGINE=`%s`' % (db_name,i,table_type))
                    return True
        return False


    #检查表
    def CheckTable(self,database,tables,*args,**kwargs):
        pass