Halo-Z 2021-04-21 16:04 采纳率: 50%
浏览 71
已结题

python多线程卡主

# coding=utf-8

import datetime
import os
import sys
import re
import itertools
import getopt
import time
from collections import OrderedDict
import subprocess
import getpass
import signal
import traceback
from Queue import Queue
from Queue import Empty
from threading import Thread
import thread
import logging


def exe_query(sql_str, dbname, gpport, gphost, gpuser):
    lst_of_col = []
    rc = None
    out = None
    err = None

    CMD = "PGDATABASE=%s PGPORT=%s PGHOST=%s PGUSER=%s PGOPTIONS='-c optimizer=off -c client_encoding=UTF8' " % (
        dbname, gpport, gphost, gpuser)
    CMD = CMD + "psql -R '%s' -tAXF '%s' -v ON_ERROR_STOP=1  <<END_OF_SQL \n" % ('\n', '@|$')
    CMD = CMD + sql_str + "\n"
    CMD = CMD + "END_OF_SQL"

    try:
        result = subprocess.Popen(CMD,
                                  shell = True,
                                  stdout = subprocess.PIPE,
                                  stderr = subprocess.PIPE,
                                  universal_newlines = True)
        out, err = result.communicate()
        rc = result.returncode
    except Exception as e:
        Logger(1).log(e)
    finally:
        if rc != 0:
            Logger(1).log(sql_str + '==>' + err)
            return err, rc
        else:
            lst_of_line = out.strip().split('\n', -1)
            for line in lst_of_line:
                lst_of_col.append(line.split('@|$', -1))
            return lst_of_col, rc


class Logger(object):
    def __init__(self, level=3):
        self.DEBUG = 5
        self.LOG = 4
        self.INFO = 3
        self.WARN = 2
        self.ERROR = 1
        self.options = Options()
        self.options.l = None
        self.options.qv = self.INFO
        self.level = level

        if self.options.l is None:
            self.options.l = os.path.join(os.environ.get('HOME', '.'), 'gpAdminLogs')
            if not os.path.isdir(self.options.l):
                os.mkdir(self.options.l)

            self.options.l = os.path.join(self.options.l,
                                          'dataload_' + datetime.date.today().strftime('%Y%m%d') + '.log')

        try:
            self.logfile = open(self.options.l, 'a')
        except Exception, e:
            self.log("could not open logfile %s:%s" % (self.options.l, e))

    # def __str__(self):
    #     print "Logger\n"

    def level_transfer(self, level):
        if level == self.DEBUG:
            return "DEBUG"
        elif level == self.LOG:
            return "LOG"
        elif level == self.INFO:
            return "INFO"
        elif level == self.ERROR:
            return "ERROR"
        elif level == self.WARN:
            return "WARN"
        else:
            self.log("unknown log type %i" % level)

    def log(self, a):
        """
        Level is either DEBUG, LOG, INFO, ERROR. a is the message
        """
        message = ''
        try:
            message = '|'.join(
                [datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S') + ' ', self.level_transfer(self.level), a]) + '\n'

            message = message.encode('utf-8')
        except Exception, e:
            self.logfile.write("\nWarning: Log() threw an exception: {0} \n".format(e))

        if self.level <= self.options.qv:
            sys.stdout.write(message)

        if self.level <= self.options.qv or self.level <= self.DEBUG:
            try:
                self.logfile.write(message)
                self.logfile.flush()
            except AttributeError, e:
                pass
        if self.level == self.ERROR:
            pass


class Options(object):

    def __init__(self):
        pass


class WorkerPool(object):
    """TODO:"""

    halt_command = 'halt command'

    def __init__(self, numWorkers=4, items=None, daemonize=False):
        if numWorkers <= 0:
            raise Exception("WorkerPool(): numWorkers should be greater than 0.")
        self.workers = []
        self.should_stop = False
        self.work_queue = Queue()
        self.completed_queue = Queue()
        self.num_assigned = 0
        self.daemonize = daemonize
        if items is not None:
            for item in items:
                self.work_queue.put(item)
                self.num_assigned += 1

        for i in range(0, numWorkers):
            w = Workers("worker%d" % i, self)
            self.workers.append(w)
            w.start()
        self.numWorkers = numWorkers

    ###
    def getNumWorkers(self):
        return self.numWorkers

    def getNextWorkItem(self):
        return self.work_queue.get(block = True)

    def addFinishedWorkItem(self, command):
        self.completed_queue.put(command)
        self.work_queue.task_done()

    def markTaskDone(self):
        self.work_queue.task_done()

    def addCommand(self, cmd):
        Logger().log("Adding cmd to work_queue: %s" % cmd.cmd_str)
        self.work_queue.put(cmd)
        self.num_assigned += 1

    def wait_and_printdots(self, command_count, quiet=True):
        while self.completed_queue.qsize() < command_count:
            time.sleep(1)

            if not quiet:
                sys.stdout.write(".")
                sys.stdout.flush()
        if not quiet:
            print " "
        self.join()

    def print_progress(self, command_count):
        while True:
            num_completed = self.completed_queue.qsize()
            num_completed_percentage = 0
            if command_count:
                num_completed_percentage = float(num_completed) / command_count
            # self.logger.info('%0.2f%% of jobs completed' % (num_completed_percentage * 100))
            if num_completed >= command_count:
                return
            self._join_work_queue_with_timeout(10)

    def _join_work_queue_with_timeout(self, timeout):
        done_condition = self.work_queue.all_tasks_done
        done_condition.acquire()
        try:
            while self.work_queue.unfinished_tasks:
                if timeout <= 0:
                    # Timed out.
                    return

                start_time = time.time()
                done_condition.wait(timeout)
                timeout -= (time.time() - start_time)
        finally:
            done_condition.release()

    def join(self):
        self.work_queue.join()
        return True

    def joinWorkers(self):
        for w in self.workers:
            w.join()

    def getCompletedItems(self):
        completed_list = []
        try:
            while True:
                item = self.completed_queue.get(False)  # will throw Empty
                if item is not None:
                    completed_list.append(item)
        except Empty:
            return completed_list

    # def check_results(self):
    #     """ goes through all items in the completed_queue and throws an exception at the
    #         first one that didn't execute successfully
    #
    #         throws ExecutionError
    #     """
    #     try:
    #         while True:
    #             item = self.completed_queue.get(False)
    #             if not item.get_results().wasSuccessful():
    #                 raise Exception("error")
    #     except Empty:
    #         return

    def empty_completed_items(self):
        while not self.completed_queue.empty():
            self.completed_queue.get(False)

    def isDone(self):
        # TODO: not sure that qsize() is safe
        return self.num_assigned == self.completed_queue.qsize()

    def haltWork(self):
        Logger.log("WorkerPool haltWork()")
        self.should_stop = True
        for w in self.workers:
            w.haltWork()
            self.work_queue.put(self.halt_command)


class Workers(Thread):
    """TODO:"""
    pool = None
    cmd = None
    name = None
    logger = None

    def __init__(self, name, pool):
        self.name = name
        self.pool = pool
        # self.logger = Logger()
        Thread.__init__(self)
        self.daemon = pool.daemonize

    def run(self):
        while True:
            try:
                try:
                    self.cmd = self.pool.getNextWorkItem()
                except TypeError:
                    # misleading exception raised during interpreter shutdown
                    return

                # we must have got a command to run here
                if self.cmd is None:
                    Logger().log("[%s] got a None cmd" % self.name)
                    self.pool.markTaskDone()
                elif self.cmd is self.pool.halt_command:
                    Logger().log("[%s] got a halt cmd" % self.name)
                    self.pool.markTaskDone()
                    self.cmd = None
                    return
                elif self.pool.should_stop:
                    Logger().log("[%s] got cmd and pool is stopped: %s" % (self.name, self.cmd))
                    self.pool.markTaskDone()
                    self.cmd = None
                else:
                    Logger().log("[%s] got cmd: %s" % (self.name, self.cmd.cmd_str))
                    self.cmd.run()
                    Logger().log("[%s] finished cmd: %s" % (self.name, self.cmd))
                    # self.pool.addFinishedWorkItem(self.cmd)
                    self.pool.markTaskDone()
                    self.cmd = None

            except Exception, e:
                Logger(1).log(e.message)
                if self.cmd:
                    Logger(1).log("[%s] finished cmd with exception: %s" % (self.name, self.cmd))
                    # self.pool.addFinishedWorkItem(self.cmd)
                    self.pool.markTaskDone()
                    self.cmd = None

    def haltWork(self):
        Logger().log("[%s] haltWork" % self.name)
        c = self.cmd
        if c is not None and isinstance(c, Command):
            c.interrupt()
            c.cancel()


class Command(object):
    """ TODO:
    """

    def __init__(self, name, cmd_str, remoteHost=None):
        self.cmd_str = cmd_str
        self.name = name
        self.result = None

    def __str__(self):
        # print "Command"
        return "Command"

    def run(self):

        pass

    def interrupt(self):
        pass

    def cancel(self):
        pass


class CreateExterlTable(Command):
    def __init__(self):

        Command.__init__(self, name = 'Create Exterl Table', cmd_str = None)


class GenerateInsertSql(Command):
    def __init__(self):

        Command.__init__(self, name = 'Generate Insert Sql', cmd_str = None)


class DataManage(Command):
    name = "Data Manage"

    def __init__(self, name, dest_host, dest_port, dest_user, dest_db, sour_host, sour_port,
                 sour_user, sour_db, table_pair):
        self._pool = None
        self._dest_host = dest_host
        self._dest_port = dest_port
        self._dest_user = dest_user
        self._dest_db = dest_db
        self._sour_user = sour_user
        self._sour_port = sour_port
        self._sour_host = sour_host
        self._sour_db = sour_db
        self._table_pair = table_pair
        self.name = name
        self.log = Logger()
        Command.__init__(self, self.name, None)

    def __str__(self):
        return self.name

    def parse_table_pair(self, flag=True):
        # Logger().log('starting parse table %s\n' % self._table_pair)
        query_str = 'insert into from '
        return query_str

    def _insert_from_source(self):
        Logger().log("starting insert from source for %s" % self._table_pair)
        try:
            # self._table_pair
            query_str = self.parse_table_pair()
            # exe_query(query_str, self._sour_db, self._sour_port, self._sour_host, self._sour_user)
        except Exception, e:
            Logger().log(e)
        finally:
            thread.exit()

    def _insert_to_destination(self):
        Logger().log("starting insert to destination for %s" % self._table_pair)
        try:
            # self._table_pair
            query_str = self.parse_table_pair()
            # exe_query(query_str, self._sour_db, self._sour_port, self._sour_host, self._sour_user)
        except Exception, e:
            Logger().log(e)
        finally:
            thread.exit()

    def _copy_data(self):
        # self._pool.empty_completed_items()
        source_thread = Thread(target = self._insert_from_source)
        source_thread.start()
        destination_thread = Thread(target = self._insert_to_destination)
        destination_thread.start()

        while True:
            if source_thread.is_alive():
                source_thread.join()
            elif destination_thread.is_alive():
                destination_thread.join()
            else:
                break

        Logger().log('copy data success')

    def run(self):

        self._copy_data()


class DataCopy(object):
    def __init__(self):
        # self.table_pair = ['public.test01', 'public.test02', 'public.test03', 'public.test04']
        self.table_pair = ['public.test01', 'public.test02']
        self._pool = None
        self._dest_host = '192.168.126.30'
        self._dest_port = '5432'
        self._dest_user = 'gpadmin'
        self._dest_db = 'work'
        self._sour_user = 'gpadmin6'
        self._sour_port = '5436'
        self._sour_host = '192.168.126.30'
        self._sour_db = 'work'
        self.name = 'data manager'

    def run(self):
        self._pool = WorkerPool(2)
        for t in self.table_pair:

            cmd = DataManage(name = t,
                             dest_host = self._dest_host,
                             dest_port = self._dest_port,
                             dest_db = self._dest_db,
                             dest_user = self._dest_user,
                             sour_db = self._sour_db,
                             sour_host = self._sour_host,
                             sour_port = self._sour_port,
                             sour_user = self._sour_user,
                             table_pair = t)
            self._pool.addCommand(cmd)
            self._pool.join()

        # self._pool.joinWorkers()

        Logger().log('======>Exiting Main Work.<======')


if __name__ == '__main__':
    dc = DataCopy()
    dc.run()

这个样例程序运行的时候总是会卡主,线程退不出来。有哪位大神能帮忙看看的

  • 写回答

1条回答 默认 最新

  • Halo-Z 2021-04-21 18:48
    关注

    加了empty判断,空则break否则继续while循环,这样能正常退出来

    评论

报告相同问题?

悬赏问题

  • ¥15 halcon ocr mlp 识别问题
  • ¥15 已知曲线满足正余弦函数,根据其峰值,还原出整条曲线
  • ¥20 无法创建新的堆栈防护界面
  • ¥15 sessionStorage在vue中的用法
  • ¥15 wordpress更换域名后用户图片头像不显示
  • ¥15 如何在ubunto上安装CEF (Chromium Embedded Framework),并且基于qt实现打开一个web
  • ¥30 AD9854 为什么输出波形幅度受限,AI机器人勿扰
  • ¥15 如何在ubunto上安装CEF (Chromium Embedded Framework
  • ¥15 如何联系真正的开发者而非公司
  • ¥15 有偿求苍穹外卖环境配置