# coding=utf-8
import datetime
import os
import sys
import re
import itertools
import getopt
import time
from collections import OrderedDict
import subprocess
import getpass
import signal
import traceback
from Queue import Queue
from Queue import Empty
from threading import Thread
import thread
import logging
def exe_query(sql_str, dbname, gpport, gphost, gpuser):
lst_of_col = []
rc = None
out = None
err = None
CMD = "PGDATABASE=%s PGPORT=%s PGHOST=%s PGUSER=%s PGOPTIONS='-c optimizer=off -c client_encoding=UTF8' " % (
dbname, gpport, gphost, gpuser)
CMD = CMD + "psql -R '%s' -tAXF '%s' -v ON_ERROR_STOP=1 <<END_OF_SQL \n" % ('\n', '@|$')
CMD = CMD + sql_str + "\n"
CMD = CMD + "END_OF_SQL"
try:
result = subprocess.Popen(CMD,
shell = True,
stdout = subprocess.PIPE,
stderr = subprocess.PIPE,
universal_newlines = True)
out, err = result.communicate()
rc = result.returncode
except Exception as e:
Logger(1).log(e)
finally:
if rc != 0:
Logger(1).log(sql_str + '==>' + err)
return err, rc
else:
lst_of_line = out.strip().split('\n', -1)
for line in lst_of_line:
lst_of_col.append(line.split('@|$', -1))
return lst_of_col, rc
class Logger(object):
def __init__(self, level=3):
self.DEBUG = 5
self.LOG = 4
self.INFO = 3
self.WARN = 2
self.ERROR = 1
self.options = Options()
self.options.l = None
self.options.qv = self.INFO
self.level = level
if self.options.l is None:
self.options.l = os.path.join(os.environ.get('HOME', '.'), 'gpAdminLogs')
if not os.path.isdir(self.options.l):
os.mkdir(self.options.l)
self.options.l = os.path.join(self.options.l,
'dataload_' + datetime.date.today().strftime('%Y%m%d') + '.log')
try:
self.logfile = open(self.options.l, 'a')
except Exception, e:
self.log("could not open logfile %s:%s" % (self.options.l, e))
# def __str__(self):
# print "Logger\n"
def level_transfer(self, level):
if level == self.DEBUG:
return "DEBUG"
elif level == self.LOG:
return "LOG"
elif level == self.INFO:
return "INFO"
elif level == self.ERROR:
return "ERROR"
elif level == self.WARN:
return "WARN"
else:
self.log("unknown log type %i" % level)
def log(self, a):
"""
Level is either DEBUG, LOG, INFO, ERROR. a is the message
"""
message = ''
try:
message = '|'.join(
[datetime.datetime.today().strftime('%Y-%m-%d %H:%M:%S') + ' ', self.level_transfer(self.level), a]) + '\n'
message = message.encode('utf-8')
except Exception, e:
self.logfile.write("\nWarning: Log() threw an exception: {0} \n".format(e))
if self.level <= self.options.qv:
sys.stdout.write(message)
if self.level <= self.options.qv or self.level <= self.DEBUG:
try:
self.logfile.write(message)
self.logfile.flush()
except AttributeError, e:
pass
if self.level == self.ERROR:
pass
class Options(object):
def __init__(self):
pass
class WorkerPool(object):
"""TODO:"""
halt_command = 'halt command'
def __init__(self, numWorkers=4, items=None, daemonize=False):
if numWorkers <= 0:
raise Exception("WorkerPool(): numWorkers should be greater than 0.")
self.workers = []
self.should_stop = False
self.work_queue = Queue()
self.completed_queue = Queue()
self.num_assigned = 0
self.daemonize = daemonize
if items is not None:
for item in items:
self.work_queue.put(item)
self.num_assigned += 1
for i in range(0, numWorkers):
w = Workers("worker%d" % i, self)
self.workers.append(w)
w.start()
self.numWorkers = numWorkers
###
def getNumWorkers(self):
return self.numWorkers
def getNextWorkItem(self):
return self.work_queue.get(block = True)
def addFinishedWorkItem(self, command):
self.completed_queue.put(command)
self.work_queue.task_done()
def markTaskDone(self):
self.work_queue.task_done()
def addCommand(self, cmd):
Logger().log("Adding cmd to work_queue: %s" % cmd.cmd_str)
self.work_queue.put(cmd)
self.num_assigned += 1
def wait_and_printdots(self, command_count, quiet=True):
while self.completed_queue.qsize() < command_count:
time.sleep(1)
if not quiet:
sys.stdout.write(".")
sys.stdout.flush()
if not quiet:
print " "
self.join()
def print_progress(self, command_count):
while True:
num_completed = self.completed_queue.qsize()
num_completed_percentage = 0
if command_count:
num_completed_percentage = float(num_completed) / command_count
# self.logger.info('%0.2f%% of jobs completed' % (num_completed_percentage * 100))
if num_completed >= command_count:
return
self._join_work_queue_with_timeout(10)
def _join_work_queue_with_timeout(self, timeout):
done_condition = self.work_queue.all_tasks_done
done_condition.acquire()
try:
while self.work_queue.unfinished_tasks:
if timeout <= 0:
# Timed out.
return
start_time = time.time()
done_condition.wait(timeout)
timeout -= (time.time() - start_time)
finally:
done_condition.release()
def join(self):
self.work_queue.join()
return True
def joinWorkers(self):
for w in self.workers:
w.join()
def getCompletedItems(self):
completed_list = []
try:
while True:
item = self.completed_queue.get(False) # will throw Empty
if item is not None:
completed_list.append(item)
except Empty:
return completed_list
# def check_results(self):
# """ goes through all items in the completed_queue and throws an exception at the
# first one that didn't execute successfully
#
# throws ExecutionError
# """
# try:
# while True:
# item = self.completed_queue.get(False)
# if not item.get_results().wasSuccessful():
# raise Exception("error")
# except Empty:
# return
def empty_completed_items(self):
while not self.completed_queue.empty():
self.completed_queue.get(False)
def isDone(self):
# TODO: not sure that qsize() is safe
return self.num_assigned == self.completed_queue.qsize()
def haltWork(self):
Logger.log("WorkerPool haltWork()")
self.should_stop = True
for w in self.workers:
w.haltWork()
self.work_queue.put(self.halt_command)
class Workers(Thread):
"""TODO:"""
pool = None
cmd = None
name = None
logger = None
def __init__(self, name, pool):
self.name = name
self.pool = pool
# self.logger = Logger()
Thread.__init__(self)
self.daemon = pool.daemonize
def run(self):
while True:
try:
try:
self.cmd = self.pool.getNextWorkItem()
except TypeError:
# misleading exception raised during interpreter shutdown
return
# we must have got a command to run here
if self.cmd is None:
Logger().log("[%s] got a None cmd" % self.name)
self.pool.markTaskDone()
elif self.cmd is self.pool.halt_command:
Logger().log("[%s] got a halt cmd" % self.name)
self.pool.markTaskDone()
self.cmd = None
return
elif self.pool.should_stop:
Logger().log("[%s] got cmd and pool is stopped: %s" % (self.name, self.cmd))
self.pool.markTaskDone()
self.cmd = None
else:
Logger().log("[%s] got cmd: %s" % (self.name, self.cmd.cmd_str))
self.cmd.run()
Logger().log("[%s] finished cmd: %s" % (self.name, self.cmd))
# self.pool.addFinishedWorkItem(self.cmd)
self.pool.markTaskDone()
self.cmd = None
except Exception, e:
Logger(1).log(e.message)
if self.cmd:
Logger(1).log("[%s] finished cmd with exception: %s" % (self.name, self.cmd))
# self.pool.addFinishedWorkItem(self.cmd)
self.pool.markTaskDone()
self.cmd = None
def haltWork(self):
Logger().log("[%s] haltWork" % self.name)
c = self.cmd
if c is not None and isinstance(c, Command):
c.interrupt()
c.cancel()
class Command(object):
""" TODO:
"""
def __init__(self, name, cmd_str, remoteHost=None):
self.cmd_str = cmd_str
self.name = name
self.result = None
def __str__(self):
# print "Command"
return "Command"
def run(self):
pass
def interrupt(self):
pass
def cancel(self):
pass
class CreateExterlTable(Command):
def __init__(self):
Command.__init__(self, name = 'Create Exterl Table', cmd_str = None)
class GenerateInsertSql(Command):
def __init__(self):
Command.__init__(self, name = 'Generate Insert Sql', cmd_str = None)
class DataManage(Command):
name = "Data Manage"
def __init__(self, name, dest_host, dest_port, dest_user, dest_db, sour_host, sour_port,
sour_user, sour_db, table_pair):
self._pool = None
self._dest_host = dest_host
self._dest_port = dest_port
self._dest_user = dest_user
self._dest_db = dest_db
self._sour_user = sour_user
self._sour_port = sour_port
self._sour_host = sour_host
self._sour_db = sour_db
self._table_pair = table_pair
self.name = name
self.log = Logger()
Command.__init__(self, self.name, None)
def __str__(self):
return self.name
def parse_table_pair(self, flag=True):
# Logger().log('starting parse table %s\n' % self._table_pair)
query_str = 'insert into from '
return query_str
def _insert_from_source(self):
Logger().log("starting insert from source for %s" % self._table_pair)
try:
# self._table_pair
query_str = self.parse_table_pair()
# exe_query(query_str, self._sour_db, self._sour_port, self._sour_host, self._sour_user)
except Exception, e:
Logger().log(e)
finally:
thread.exit()
def _insert_to_destination(self):
Logger().log("starting insert to destination for %s" % self._table_pair)
try:
# self._table_pair
query_str = self.parse_table_pair()
# exe_query(query_str, self._sour_db, self._sour_port, self._sour_host, self._sour_user)
except Exception, e:
Logger().log(e)
finally:
thread.exit()
def _copy_data(self):
# self._pool.empty_completed_items()
source_thread = Thread(target = self._insert_from_source)
source_thread.start()
destination_thread = Thread(target = self._insert_to_destination)
destination_thread.start()
while True:
if source_thread.is_alive():
source_thread.join()
elif destination_thread.is_alive():
destination_thread.join()
else:
break
Logger().log('copy data success')
def run(self):
self._copy_data()
class DataCopy(object):
def __init__(self):
# self.table_pair = ['public.test01', 'public.test02', 'public.test03', 'public.test04']
self.table_pair = ['public.test01', 'public.test02']
self._pool = None
self._dest_host = '192.168.126.30'
self._dest_port = '5432'
self._dest_user = 'gpadmin'
self._dest_db = 'work'
self._sour_user = 'gpadmin6'
self._sour_port = '5436'
self._sour_host = '192.168.126.30'
self._sour_db = 'work'
self.name = 'data manager'
def run(self):
self._pool = WorkerPool(2)
for t in self.table_pair:
cmd = DataManage(name = t,
dest_host = self._dest_host,
dest_port = self._dest_port,
dest_db = self._dest_db,
dest_user = self._dest_user,
sour_db = self._sour_db,
sour_host = self._sour_host,
sour_port = self._sour_port,
sour_user = self._sour_user,
table_pair = t)
self._pool.addCommand(cmd)
self._pool.join()
# self._pool.joinWorkers()
Logger().log('======>Exiting Main Work.<======')
if __name__ == '__main__':
dc = DataCopy()
dc.run()
这个样例程序运行的时候总是会卡主,线程退不出来。有哪位大神能帮忙看看的