lutsest
2021-08-21 03:38
采纳率: 0%
浏览 264
已结题

请教一个 关于调用jar包的python 程序

在校学生一枚,需要用一段 调用jar包的python程序,无奈没有python基础,请各位帮忙看下,教教我如何使用?因为包内没给任何使用说明文件,主模块代码也就200行左右,我贴到下面,还请帮忙:

请教这个包如何使用,该怎样运行起来,模型可以输入输出,里面的函数能赋值运行就行。

包内是调用了一个jar包,通过cmd命令行运行,这个我我已经会自行使用了,但是通过这个python调用的方法我 还是很懵逼。

所以我的诉求是:能够让下面的代码跑起来,给他该给的参数,输出可以实现的结果关于酬金可以根据任务量再调整。

java和python的环境都已经ok,python编译器是 anaconda 的python3.8,当然下面代码是 python2.*,需要使用2to3转换一下。

库也可以自行下载:pip install cf-clus

包内共8个文件,含有效代码的是3个文件:

第一个: library.py

from subprocess import check_output, CalledProcessError
from tempfile import NamedTemporaryFile
import os
import ConfigParser
import json

import subprocess
import StringIO


def clus(input_dict):
    # First we write the ARFF file and the settings file into temporary files.
    temporary_arff = NamedTemporaryFile(suffix='.arff', delete=False)
    temporary_arff.write(input_dict['arff'])
    temporary_arff.close()
    temporary_settings = NamedTemporaryFile(suffix='.s', delete=False)
    settings = input_dict['settings']
    if settings is None:
        settings = ''
    temporary_settings.write(settings)
    temporary_settings.close()

    # We need to change the filenames in the settings.
    settings = ConfigParser.RawConfigParser()
    settings.optionxform = str
    settings.read(temporary_settings.name)
    if not settings.has_section('Data'):
        settings.add_section('Data')
    settings.set('Data', 'File', temporary_arff.name)

    has_prune = False
    # We check if there is prune set data.
    if input_dict.get('prune', None) is not None:
        temporary_validation = NamedTemporaryFile(suffix='.arff', delete=False)
        temporary_validation.write(input_dict['prune'])
        temporary_validation.close()
        settings.set('Data', 'PruneSet', temporary_validation.name)
        has_prune = True

    has_test = False
    # We check if there is test set data.
    if input_dict.get('test', None) is not None:
        temporary_test = NamedTemporaryFile(suffix='.arff', delete=False)
        temporary_test.write(input_dict['test'])
        temporary_test.close()
        settings.set('Data', 'TestSet', temporary_test.name)
        has_test = True

    # We need to enable ClowdFlows output.
    if not settings.has_section('Output'):
        settings.add_section('Output')
    settings.set('Output', 'OutputClowdFlowsJSON', 'Yes')

    temporary_settings = open(temporary_settings.name, mode='wb')
    settings.write(temporary_settings)
    temporary_settings.close()

    # Execute CLUS.

    args = input_dict['args'].replace(";", "").replace("|", "")

    if len(args.strip()) > 0:
        args = args.split(" ")
    else:
        args = []

    clus_path = os.path.join(os.path.abspath(os.path.dirname(__file__)), 'bin', 'Clus.jar')
    p = subprocess.Popen(["java", "-jar", clus_path] + args + [temporary_settings.name,], stdout=subprocess.PIPE,
                         stderr=subprocess.PIPE)

    if p.returncode != 0 and p.returncode is not None:
        raise Exception("There was an error when running CLUS: " + str(p.stderr.read()) + " (Error code: " + str(
            p.returncode) + ")")

    output = p.stdout.read()
    error = p.stderr.read()

    if len(error.strip()) > 0:
        if "Error: " in error:
            raise Exception(error.strip())

    try:
        output_file = open(temporary_settings.name.replace(".s", ".out"), 'rb')
        output = output_file.read()
        os.unlink(output_file.name)
    except:
        pass

    fimp = ""
    try:
        fimp_file = open(temporary_settings.name.replace(".s", ".fimp"), 'rb')
        fimp = fimp_file.read()
        os.unlink(fimp_file.name)
    except:
        pass

    try:
        json_file = open(temporary_settings.name.replace(".s", ".json"), 'rb')
        json_contents = json.loads(json_file.read())
        returned_settings = json_contents['settings']
        models = json_contents['models']
        os.unlink(json_file.name)

        default = {}
        original = {}
        pruned = {}

        for m in models:
            # print m
            if m['name'] == 'Default':
                default = m['representation']
            if m['name'] == 'Original':
                original = m['representation']
            if m['name'] == 'Pruned':
                pruned = m['representation']
    except:
        returned_settings = None
        models = None
        default = None
        original = None
        pruned = None

    # We remove all temporary files.
    os.unlink(temporary_arff.name)
    os.unlink(temporary_settings.name)

    if has_prune:
        os.unlink(temporary_validation.name)
    if has_test:
        os.unlink(temporary_test.name)

    return {
        'output': output,
        'settings': returned_settings,
        'models': models,
        'default': default,
        'original': original,
        'pruned': pruned,
        'fimp': fimp,
        # 'error:': error
    }


def clus_display_svg(input_dict):
    return {}


def clus_display_tree(input_dict):
    return {}


def clus_display_tree_and_examples(input_dict):
    if type(input_dict['classifier']) == list:
        raise Exception("This widget does not work on multiple trees.")
    return {}


def clus_display_tree_and_summary(input_dict):
    return {}


def handle_setting(name, input_dict, section, settings, checkbox=False):
    if not checkbox and input_dict.get(name, None) is not None \
            and input_dict.get(name, "").strip() != "" \
            and input_dict.get(name, "") != "null":
        if not settings.has_section(section):
            settings.add_section(section)
        settings.set(section, name, input_dict[name])
    if checkbox:
        if input_dict.get(name, None) is not None \
                and input_dict.get(name, "").strip() != "" \
                and input_dict.get(name, "") != "null":
            if not settings.has_section(section):
                settings.add_section(section)
            settings.set(section, name, "Yes")
        else:
            if not settings.has_section(section):
                settings.add_section(section)
            settings.set(section, name, "No")


def clus_generate_settings(input_dict):
    settings = ConfigParser.RawConfigParser()
    settings.optionxform = str
    settings_buffer = StringIO.StringIO()

    handle_setting("RandomSeed", input_dict, "General", settings)

    handle_setting("Target", input_dict, "Attributes", settings)
    handle_setting("Clustering", input_dict, "Attributes", settings)
    handle_setting("Disable", input_dict, "Attributes", settings)
    handle_setting("Key", input_dict, "Attributes", settings)
    handle_setting("Weights", input_dict, "Attributes", settings)

    handle_setting("MinimalWeight", input_dict, "Model", settings)

    handle_setting("FTest", input_dict, "Tree", settings)
    handle_setting("SplitSampling", input_dict, "Tree", settings)
    handle_setting("Heuristic", input_dict, "Tree", settings)
    handle_setting("PruningMethod", input_dict, "Tree", settings)
    handle_setting("InductionOrder", input_dict, "Tree", settings)
    handle_setting("EntropyType", input_dict, "Tree", settings)

    handle_setting("BranchFrequency", input_dict, "Output", settings, checkbox=True)

    settings.write(settings_buffer)

    return {
        'settings': settings_buffer.getvalue()
    }


第2个 visualization_views.py

from django.shortcuts import render
import os
from utils import clus_tree_to_dot, clus_tree_to_node_edge, get_instance_nodes
from random import random

from django.conf import settings
import arff


def clus_display_svg(request, input_dict, output_dict, widget):
    """Visualization displaying a decision tree"""

    import subprocess
    from mothra.settings import MEDIA_ROOT
    from workflows.helpers import ensure_dir

    img_type = 'svg'
    if input_dict['img_type'] == 'raster':
        img_type = 'png'

    dot_text = """digraph J48Tree {
    N0 [label="f8" ]
    N0->N1 [label="= +"]
    N1 [label="f99" ]
    N1->N2 [label="= +"]
    N2 [label="east (10.0/1.0)" shape=box style=filled ]
    N1->N3 [label="= -"]
    N3 [label="west (3.0/1.0)" shape=box style=filled ]
    N0->N4 [label="= -"]
    N4 [label="west (7.0)" shape=box style=filled ]
    }"""

    if type(input_dict['classifier']) == list:
        dot_text = ""
        starting_id = 0
        for cls in input_dict['classifier']:
            dot_representation, starting_id = clus_tree_to_dot(cls['representation'], starting_id)
            dot_text += dot_representation + "\n"
            # dot_text = dot_text + "digraph " + cls['name'] + " {\n" + \
            #           dot_representation + "}\n\n"
        dot_text = "digraph Tree {\n" + dot_text + "}"
    else:
        dot_text = "digraph Tree {\n" + clus_tree_to_dot(input_dict['classifier'], 0)[0] + "}"

    filename = '/'.join([str(request.user.id), 'decisionTree-clus-%d.dot' % widget.id])
    dotfile = filename
    destination_dot = '/'.join([MEDIA_ROOT, filename])
    ensure_dir(destination_dot)

    with open(destination_dot, 'w') as dot_file:
        dot_file.write(dot_text)

    # png/svg file
    filename = '/'.join([str(request.user.id),
                         'decisionTree-clus-%d.%s' % (widget.id, img_type)
                         ])
    destination_img = os.path.join(MEDIA_ROOT, filename)
    ensure_dir(destination_img)

    try:
        dot_path = settings.DOT_PATH
    except:
        dot_path = 'dot'

    subprocess.call(dot_path + " -T%s %s -o %s" % (img_type, destination_dot, destination_img), shell=True)

    return render(request,
                  'visualizations/cf_clus_display_svg_tree.html',
                  {'filename': filename,
                   'dotfile': dotfile,
                   'random': int(random() * 10000000),
                   'widget': widget,
                   'input_dict': input_dict})


def clus_display_tree(request, input_dict, output_dict, widget):
    """Visualization displaying a decision tree"""

    if type(input_dict['classifier']) == list:
        nodes, edges = [], []
        starting_id = 0
        for cls in input_dict['classifier']:
            new_nodes, new_edges, starting_id = clus_tree_to_node_edge(cls['representation'], starting_id)
            nodes += new_nodes
            edges += new_edges
    else:
        nodes, edges, index = clus_tree_to_node_edge(input_dict['classifier'], 0)

    return render(request,
                  'visualizations/cf_clus_display_tree.html',
                  {
                      'widget': widget,
                      'input_dict': input_dict,
                      'nodes': nodes,
                      'edges': edges
                  })


def clus_display_tree_and_examples(request, input_dict, output_dict, widget):
    """Visualization displaying a decision tree and the examples in the tree"""

    nodes, edges, index = clus_tree_to_node_edge(input_dict['classifier'], 0)

    data = arff.loads(input_dict['arff'])

    datanodes = []

    for instance in data['data']:
        instance_nodes = get_instance_nodes(input_dict['classifier'], instance, data['attributes'])
        datanodes.append({'data': instance, 'nodes': instance_nodes})

    return render(request,
                  'visualizations/cf_clus_display_tree_and_examples.html',
                  {
                      'widget': widget,
                      'input_dict': input_dict,
                      'nodes': nodes,
                      'edges': edges,
                      'data': data,
                      'datanodes': datanodes,
                      'random': int(random() * 10000000),
                  })


def clus_display_tree_and_summary(request, input_dict, output_dict, widget):
    """Visualization displaying a decision tree and the summary"""

    if type(input_dict['classifier']) == list:
        nodes, edges = [], []
        starting_id = 0
        attributes = input_dict['classifier'][0]['representation']['summary']['names']
        for cls in input_dict['classifier']:
            new_nodes, new_edges, starting_id = clus_tree_to_node_edge(cls['representation'], starting_id)
            nodes += new_nodes
            edges += new_edges
    else:
        nodes, edges, index = clus_tree_to_node_edge(input_dict['classifier'], 0)
        attributes = input_dict['classifier']['summary']['names']

    return render(request,
                  'visualizations/cf_clus_display_tree_and_summary.html',
                  {
                      'widget': widget,
                      'input_dict': input_dict,
                      'nodes': nodes,
                      'edges': edges,
                      'attributes': attributes,
                      'random': int(random() * 10000000),
                  })


第3 个: utils.py

def clus_tree_to_dot(node, node_index):
    text = ""
    node['dot_id'] = node_index
    if 'children' in node:
        text += "N" + str(node_index) + ' [label="' + node['test_string'] + '"]\n'
        node_index += 1
        for child in node['children']:
            child_text, node_index = clus_tree_to_dot(child, node_index)
            text += child_text
            text += "N" + str(node['dot_id']) + "->N" + str(child['dot_id']) + '[label="' + child[
                'branch_label'] + '"]' + "\n"
    else:
        return "N" + str(node_index) + ' [label="' + node[
            'target_stat'].replace(',', ',\\n') + '" shape=box style=filled ]\n', node_index + 1
    return text, node_index


def clus_tree_to_node_edge(node, node_index):
    nodes = []
    edges = []
    node['dot_id'] = node_index
    if 'children' in node:
        nodes.append({'id': node['dot_id'], 'label': node['test_string'], 'shape': 'ellipse',
                      'target_stat': node['target_stat'].replace(',', ',\\n'),
                      'title': node['target_stat'].replace(',', ',<br>'),
                      'min': node['summary']['min'],
                      'max': node['summary']['max'],
                      'stddev': node['summary']['stddev'],
                      'avg': node['summary']['avg']})
        node_index += 1
        for child in node['children']:
            child_nodes, child_edges, node_index = clus_tree_to_node_edge(child, node_index)
            nodes += child_nodes
            edges += child_edges
            edges.append({'from': node['dot_id'], 'to': child['dot_id'], 'label': child['branch_label']})
    else:
        nodes.append({'id': node_index, 'label': node['target_stat'].replace(',', ',\\n'), 'shape': 'box',
                      'target_stat': node['target_stat'], 'title': node['target_stat'].replace(',', ',<br>'),
                      'min': node['summary']['min'],
                      'max': node['summary']['max'],
                      'stddev': node['summary']['stddev'],
                      'avg': node['summary']['avg']})
        return nodes, edges, node_index + 1
    return nodes, edges, node_index


def perform_test(test_string, instance, attributes):
    if '(' in test_string:
        test_string = test_string.split(' (')[0]
    if ' > ' in test_string:
        name_value = test_string.split(' > ')
        name = name_value[0]
        value = float(name_value[1])
        i = 0
        for a in attributes:
            if a[0] == name:
                return instance[i] > value
            i += 1
    if ' = ' in test_string:
        name_value = test_string.split(' = ')
        name = name_value[0]
        value = name_value[1]
        i = 0
        for a in attributes:
            if a[0] == name:
                return instance[i] == value
            i += 1
    if " <= " in test_string:
        name_value = test_string.split(' <= ')
        name = name_value[0]
        value = name_value[1]
        i = 0
        for a in attributes:
            if a[0] == name:
                return instance[i] <= value
            i += 1
    if " in " in test_string:
        name_value = test_string.split(' in ')
        name = name_value[0]
        value = name_value[1]
        value = value.replace("{", "")
        value = value.replace("}", "")
        values = value.split(",")
        values = [x.strip() for x in values]
        i = 0
        for a in attributes:
            if a[0] == name:
                return instance[i] in values
            i += 1
    return None


def get_instance_nodes(node, instance, attributes):
    node_ids = []
    node_ids.append(node['dot_id'])
    if 'children' in node:
        test_result = perform_test(node['test_string'], instance, attributes)
        for child in node['children']:
            if test_result == True and child['branch_label'] == 'Yes':
                node_ids = node_ids + get_instance_nodes(child, instance, attributes)
            if test_result == False and child['branch_label'] == 'No':
                node_ids = node_ids + get_instance_nodes(child, instance, attributes)
    return node_ids


第4个: settings.py

import os

# === STANDARD PACKAGE SETTINGS ===
PACKAGE_ROOT = os.path.dirname(__file__)

# === AUTO IMPORT OPTIONS ===
#If auto_import_package_data is true then given data file is automatically imported when ClowdFlows project is newly deployed or refreshed from git
AUTO_IMPORT_DB = False
#For auto_import_package_data_replace_option description see the 'replace' option in workflows/import_package command
AUTO_IMPORT_DB_REPLACE_OPTION = True
#If file(s) other than ./db/package_data.json should be imported, auto_import_package_data_files should be corrected
AUTO_IMPORT_DB_FILES = [os.path.join(PACKAGE_ROOT,'db/package_data.json')]

第5个: urls.py

from django.conf.urls.defaults import patterns, include, url

urlpatterns = patterns('',
    #url(r'^get-adc-index/widget(?P<widget_id>[0-9]+)/nx/Index.html$', 'workflows.latino.views.get_adc_index', name='get adc index'),
    #url(r'^get-adc-index/widget(?P<widget_id>[0-9]+)/(?P<narrow_doc>n?)x/Index.html$', 'workflows.latino.views.get_adc_index', name='get adc index'),
    #url(r'^get-adc-index/widget(?P<widget_id>[0-9]+)/(?P<narrow_doc>n?)x/Index(?P<document_id_from>[0-9]+)-(?P<document_id_to>[0-9]+).html$', 'workflows.latino.views.get_adc_index', name='get adc index'),
    #url(r'^get-adc-index/widget(?P<widget_id>[0-9]+)/(?P<narrow_doc>n?)x/Document(?P<document_id>[0-9]+).html', 'workflows.latino.views.get_adc_page', name='get adc page'),
)

第6个:interaction_views.py 就一行

  • 收藏

5条回答 默认 最新

相关推荐 更多相似问题