| | import pika
|
| | import os
|
| |
|
| |
|
| | import json
|
| | import time
|
| |
|
| | from config import get_config
|
| | from function import topic_clustering_not_summary as tc
|
| | from function import topic_clustering_social
|
| | import requests
|
| |
|
| | config_params = get_config()
|
| | ConfigManager = config_params['ConfigManager']
|
| |
|
| | last_time_check = time.time()
|
| | def update_result(result, type='daily', meta = {}):
|
| | benchmark_children_id = -1
|
| | benchmark_id = -1
|
| | source_tagids = []
|
| | for id_cluster in result:
|
| | for doc in result[id_cluster][:1]:
|
| | source_tagids = doc.get('source_tagids',[])
|
| | for key in doc:
|
| | if "benchmark_child" in key:
|
| | benchmark_children_id = int(key.lstrip('benchmark_child_'))
|
| | if "benchmark" in key and 'child' not in key:
|
| | benchmark_id = int(key.lstrip('benchmark_'))
|
| | break
|
| |
|
| | if not source_tagids:
|
| | source_tagids = []
|
| |
|
| | if len(source_tagids) > 0:
|
| | benchmark_id = 0
|
| | benchmark_children_id = 0
|
| |
|
| | output = {
|
| | "benchmark_id": benchmark_id,
|
| | "benchmark_children_id": benchmark_children_id,
|
| | "source_tagids": source_tagids,
|
| | "country_code": meta.get('country_code',''),
|
| | "type": type,
|
| | "data": json.dumps(result)
|
| | }
|
| |
|
| |
|
| |
|
| |
|
| | url = ConfigManager['ApiConnects']['api_save_clustering']['BaseUrl']
|
| |
|
| |
|
| | res = requests.post(url, json = output)
|
| | print(res.text)
|
| | print('Update result !!!!!!!!!')
|
| |
|
| | def callback_func(ch, method, properties, body):
|
| | print("receive done: ")
|
| |
|
| |
|
| |
|
| | starttime = time.time()
|
| | body = json.loads(body.decode("utf-8"))
|
| |
|
| |
|
| |
|
| |
|
| | docs = body['docs']
|
| |
|
| | threshold = 0.25
|
| | top_cluster = body['top_cluster']
|
| | top_sentence = body['top_sentence']
|
| | topn_summary = body['topn_summary']
|
| | hash_str = body['hash_str']
|
| | st_time = body['st_time']
|
| | meta = body.get('meta',{})
|
| | country_code = meta.get("country_code", "")
|
| | delete_message = False if country_code in ["ICOMM-RND","SOCIAL"] else True
|
| |
|
| | print("country_code: ", country_code, "meta: ", meta)
|
| |
|
| | is_cache = False
|
| | try:
|
| | with open("log_run/log.txt") as f:
|
| | data_dict = json.load(f)
|
| | except Exception as ve:
|
| | print(ve)
|
| | data_dict = {}
|
| |
|
| | try:
|
| | if hash_str in data_dict:
|
| | path_res = data_dict[hash_str]["response_path"]
|
| | with open(path_res) as ff:
|
| | results = json.load(ff)
|
| | print("time analysis (cache): ", time.time() - st_time)
|
| | update_result(results,meta=meta)
|
| | is_cache = True
|
| | except Exception as vee:
|
| | print(vee)
|
| |
|
| | if not is_cache:
|
| | if country_code in ["SOCIAL"]:
|
| | results = topic_clustering_social.topic_clustering(docs, threshold, top_cluster=top_cluster, top_sentence=top_sentence,
|
| | topn_summary=topn_summary, delete_message=delete_message)
|
| | else:
|
| | results = tc.topic_clustering(docs, threshold, top_cluster=top_cluster, top_sentence=top_sentence,
|
| | topn_summary=topn_summary, delete_message=delete_message)
|
| | update_result(results, meta=meta)
|
| |
|
| | path_res = "log/result_{0}.txt".format(hash_str)
|
| | with open(path_res, "w+") as ff:
|
| | ff.write(json.dumps(results))
|
| |
|
| | data_dict[hash_str] = {"time": st_time, "response_path": path_res}
|
| |
|
| | lst_rm = []
|
| | global last_time_check
|
| | if time.time() - last_time_check > 3600:
|
| | print("check log to del .....")
|
| | last_time_check = time.time()
|
| | for dt in data_dict:
|
| | if time.time() - data_dict[dt]["time"] > 30 * 24 * 3600:
|
| | lst_rm.append(dt)
|
| | for dt in lst_rm:
|
| | del data_dict[dt]
|
| | with open("log_run/log.txt", "w+") as ff:
|
| | ff.write(json.dumps(data_dict))
|
| | print("time analysis: ", time.time() - starttime)
|
| | ch.basic_ack(delivery_tag=method.delivery_tag)
|
| |
|
| |
|
| | def test():
|
| | with open('req_daily/aus.json') as f:
|
| | body = json.load(f)
|
| |
|
| | docs = body['response']['docs']
|
| |
|
| | threshold = 0.25
|
| | top_cluster = body['top_cluster']
|
| | top_sentence = body['top_sentence']
|
| | topn_summary = body['topn_summary']
|
| |
|
| |
|
| | meta = body['response'].get('meta',{})
|
| | results = tc.topic_clustering(docs, threshold, top_cluster=top_cluster, top_sentence=top_sentence,
|
| | topn_summary=topn_summary, delete_message=True)
|
| | print(results)
|
| |
|
| |
|
| | if __name__ == '__main__':
|
| |
|
| | params = ConfigManager['QueueConfigs']['queue_topic_clustering']
|
| | usr_name = params["UserName"]
|
| | password = str(params["Password"])
|
| | host = params["HostName"]
|
| | virtual_host = params["VirtualHost"]
|
| | queue_name = params["Queue"]
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| |
|
| | while True:
|
| | try:
|
| | credentials = pika.PlainCredentials(usr_name, password)
|
| | connection = pika.BlockingConnection(
|
| | pika.ConnectionParameters(host=host, virtual_host=virtual_host, credentials=credentials, heartbeat=3600, blocked_connection_timeout=3600))
|
| | channel = connection.channel()
|
| | channel.queue_declare(queue=queue_name, durable=True, arguments={"x-max-priority": 10})
|
| | print(" * wait message")
|
| | channel.basic_qos(prefetch_count=1)
|
| | channel.basic_consume(queue=queue_name, on_message_callback=callback_func)
|
| | channel.start_consuming()
|
| | except Exception as ex:
|
| | print(f'[ERROR] ', ex)
|
| |
|
| |
|