|
@@ -1,462 +1,471 @@
|
|
-#!/usr/bin/python3
|
|
|
|
-import requests
|
|
|
|
-import json
|
|
|
|
-import sys
|
|
|
|
-import time
|
|
|
|
-import _thread
|
|
|
|
-import os
|
|
|
|
-import subprocess
|
|
|
|
-import socket
|
|
|
|
-
|
|
|
|
-ERNIE_API_KEY = ""
|
|
|
|
-ERNIE_SECRET_KEY = ""
|
|
|
|
-
|
|
|
|
-OPENAI_URL = ""
|
|
|
|
-OPENAI_API_KEY = ""
|
|
|
|
-OPENAI_MODEL = ""
|
|
|
|
-
|
|
|
|
-# 后端可选“openai”或“ernie”
|
|
|
|
-BACKEND = ""
|
|
|
|
-
|
|
|
|
-# 是否启用本地模式
|
|
|
|
-USE_LOCAL = False
|
|
|
|
-LOCAL_PORT = 11434
|
|
|
|
-
|
|
|
|
-PROMPT = """
|
|
|
|
-你是一个Linux终端,请将我的请求转换成一条最简洁的bash命令。
|
|
|
|
-只给一条命令,不要做任何解释或说明。
|
|
|
|
-
|
|
|
|
-示例:
|
|
|
|
-请求:显示系统版本信息。
|
|
|
|
-输出:
|
|
|
|
-```bash
|
|
|
|
-uname -a
|
|
|
|
-```
|
|
|
|
-"""
|
|
|
|
-chats = []
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-
|
|
|
|
-# 设置API_KEY和SECRET_KEY
|
|
|
|
-def set_config():
|
|
|
|
- global ERNIE_API_KEY, ERNIE_SECRET_KEY, OPENAI_API_KEY, OPENAI_MODEL, OPENAI_URL, BACKEND
|
|
|
|
- if os.path.exists(os.path.expanduser('~/.config/erniecli/.ernierc')):
|
|
|
|
- get_config()
|
|
|
|
- elif not os.path.exists(os.path.expanduser('~/.config/erniecli')):
|
|
|
|
- os.makedirs(os.path.expanduser('~/.config/erniecli'))
|
|
|
|
- # 用黄色字体
|
|
|
|
- bidx = 0
|
|
|
|
- if BACKEND == "ernie":
|
|
|
|
- bidx = 1
|
|
|
|
- elif BACKEND == "openai":
|
|
|
|
- bidx = 2
|
|
|
|
- elif BACKEND == "":
|
|
|
|
- bidx = 0
|
|
|
|
-
|
|
|
|
- while True:
|
|
|
|
- choice_bidx = input("\033[1;33m请选择后端(0: None | 1: ernie | 2: openai)[current: {0}]:".format(bidx))
|
|
|
|
- if choice_bidx == '':
|
|
|
|
- bidx = bidx
|
|
|
|
- else:
|
|
|
|
- bidx = int(choice_bidx)
|
|
|
|
- if bidx == 1 or bidx == 2:
|
|
|
|
- break
|
|
|
|
- if bidx == 1:
|
|
|
|
- BACKEND = "ernie"
|
|
|
|
- elif bidx == 2:
|
|
|
|
- BACKEND = "openai"
|
|
|
|
-
|
|
|
|
- choose_local = input("\033[1;35m是否需要配置本地模式?(y/N)")
|
|
|
|
- if choose_local == "y":
|
|
|
|
- USE_LOCAL = True
|
|
|
|
- else:
|
|
|
|
- USE_LOCAL = False
|
|
|
|
-
|
|
|
|
- choice_ernie = input("\033[1;34m是否需要配置ernie?(y/N)")
|
|
|
|
- if choice_ernie == "y":
|
|
|
|
- apikey_value = input("\033[1;34m请输入API_KEY(当前值:"+ERNIE_API_KEY+"):")
|
|
|
|
- securekey_value = input("请输入SECRET_KEY(当前值"+ERNIE_SECRET_KEY+"):")
|
|
|
|
- if apikey_value != "":
|
|
|
|
- ERNIE_API_KEY = apikey_value.strip()
|
|
|
|
- if securekey_value != "":
|
|
|
|
- ERNIE_SECRET_KEY = securekey_value.strip()
|
|
|
|
-
|
|
|
|
- choice_openai = input("\033[1;36m是否需要配置openai?(y/N)")
|
|
|
|
- if choice_openai == "y":
|
|
|
|
- url_value = input("\033[1;36m请输入BASE URL(当前值:"+OPENAI_URL+"):")
|
|
|
|
- apikey_value = input("\033[1;36m请输入API_KEY(当前值:"+OPENAI_API_KEY+"):")
|
|
|
|
- model_value = input("请输入模型(当前值:"+OPENAI_MODEL+"):")
|
|
|
|
- if url_value != "":
|
|
|
|
- OPENAI_URL = url_value.strip()
|
|
|
|
- if apikey_value != "":
|
|
|
|
- OPENAI_API_KEY = apikey_value.strip()
|
|
|
|
- if model_value != "":
|
|
|
|
- OPENAI_MODEL = model_value.strip()
|
|
|
|
-
|
|
|
|
- with open(os.path.expanduser('~/.config/erniecli/.ernierc'), 'w', encoding='utf-8') as f:
|
|
|
|
- # 写入所有配置
|
|
|
|
- f.write("[GLOBAL]\n")
|
|
|
|
- f.write("BACKEND="+BACKEND+"\n")
|
|
|
|
- f.write("LOCAL="+str(USE_LOCAL)+"\n")
|
|
|
|
- f.write("\n[ERNIE]\n")
|
|
|
|
- f.write("API_KEY="+ERNIE_API_KEY+"\n")
|
|
|
|
- f.write("SECRET_KEY="+ERNIE_SECRET_KEY+"\n")
|
|
|
|
- f.write("\n[OPENAI]\n")
|
|
|
|
- f.write("URL="+OPENAI_URL+"\n")
|
|
|
|
- f.write("API_KEY="+OPENAI_API_KEY+"\n")
|
|
|
|
- f.write("MODEL="+OPENAI_MODEL+"\n")
|
|
|
|
- print("\033[1;32m配置成功\033[0m")
|
|
|
|
- sys.exit(0)
|
|
|
|
-
|
|
|
|
-# 读取$HOME/.config/erniecli/.ernierc文件中API_KEY和SECRET_KEY
|
|
|
|
-def get_config():
|
|
|
|
- global ERNIE_API_KEY, ERNIE_SECRET_KEY, OPENAI_API_KEY, OPENAI_MODEL, OPENAI_URL, BACKEND, USE_LOCAL
|
|
|
|
- config = os.path.expanduser('~/.config/erniecli/.ernierc')
|
|
|
|
- if not os.path.exists(config):
|
|
|
|
- print("\033[1;31m请进行使用erniecli进行配置\033[0m")
|
|
|
|
- sys.exit(0)
|
|
|
|
- # 读取配置文件,读取[global]的BACKEND
|
|
|
|
- group = "global"
|
|
|
|
- with open(config, 'r', encoding='utf-8') as f:
|
|
|
|
- for line in f.readlines():
|
|
|
|
- line = line.strip()
|
|
|
|
- if len(line) == 0 or line[0] == '#':
|
|
|
|
- continue
|
|
|
|
-
|
|
|
|
- elif line.startswith("["):
|
|
|
|
- group = line[1:-1]
|
|
|
|
- continue
|
|
|
|
- # 配置global
|
|
|
|
- if group == "GLOBAL":
|
|
|
|
- key, value = line.split('=')
|
|
|
|
- if key.strip() == "BACKEND":
|
|
|
|
- BACKEND = value.strip()
|
|
|
|
- if key.strip() == "LOCAL":
|
|
|
|
- USE_LOCAL = value.strip() == "True"
|
|
|
|
- if group == "ERNIE":
|
|
|
|
- key, value = line.split('=')
|
|
|
|
- if key.strip() == "API_KEY":
|
|
|
|
- ERNIE_API_KEY = value.strip()
|
|
|
|
- if key.strip() == "SECRET_KEY":
|
|
|
|
- ERNIE_SECRET_KEY = value.strip()
|
|
|
|
- if group == "OPENAI":
|
|
|
|
- key, value = line.split('=')
|
|
|
|
- if key.strip() == "API_KEY":
|
|
|
|
- OPENAI_API_KEY = value.strip()
|
|
|
|
- if key.strip() == "MODEL":
|
|
|
|
- OPENAI_MODEL = value.strip()
|
|
|
|
- if key.strip() == "URL":
|
|
|
|
- OPENAI_URL = value.strip()
|
|
|
|
-
|
|
|
|
-# 查询百度千帆
|
|
|
|
-def askERNIE(question):
|
|
|
|
- global chats
|
|
|
|
- url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/ernie-speed-128k?access_token=" + get_access_token()
|
|
|
|
-
|
|
|
|
- data = {
|
|
|
|
- "messages": [
|
|
|
|
- ],
|
|
|
|
- "temperature": 0.95,
|
|
|
|
- "top_p": 0.8,
|
|
|
|
- "penalty_score": 1,
|
|
|
|
- "disable_search": False,
|
|
|
|
- "enable_citation": False,
|
|
|
|
- "response_format": "text"
|
|
|
|
- }
|
|
|
|
- index = 0
|
|
|
|
- for chat in chats:
|
|
|
|
- quest = chat[0]
|
|
|
|
- if index == 0:
|
|
|
|
- quest = PROMPT+"我的问题是:{0}".format(quest)
|
|
|
|
- index = index + 1
|
|
|
|
- data['messages'].append({
|
|
|
|
- "role": "user",
|
|
|
|
- "content": quest
|
|
|
|
- })
|
|
|
|
- data['messages'].append({
|
|
|
|
- "role": "assistant",
|
|
|
|
- "content": chat[1]
|
|
|
|
- })
|
|
|
|
- if index == 0:
|
|
|
|
- question = PROMPT+"我的问题是:{0}".format(question)
|
|
|
|
- data['messages'].append({
|
|
|
|
- "role": "user",
|
|
|
|
- "content": question
|
|
|
|
- })
|
|
|
|
- payload = json.dumps(data)
|
|
|
|
- headers = {
|
|
|
|
- 'Content-Type': 'application/json'
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- response = requests.request("POST", url, headers=headers, data=payload)
|
|
|
|
-
|
|
|
|
- return response.text
|
|
|
|
-
|
|
|
|
-def get_access_token():
|
|
|
|
- """
|
|
|
|
- 使用 AK,SK 生成鉴权签名(Access Token)
|
|
|
|
- :return: access_token,或是None(如果错误)
|
|
|
|
- """
|
|
|
|
- url = "https://aip.baidubce.com/oauth/2.0/token"
|
|
|
|
- params = {"grant_type": "client_credentials", "client_id": ERNIE_API_KEY, "client_secret": ERNIE_SECRET_KEY}
|
|
|
|
- return str(requests.post(url, params=params).json().get("access_token"))
|
|
|
|
-
|
|
|
|
-# 查询OpenAI接口,如赞同模型
|
|
|
|
-def askOpenAI(question):
|
|
|
|
- global OPENAI_URL, chats
|
|
|
|
- # 如果OPENAI_URL是/结尾,去掉最后的/
|
|
|
|
- if OPENAI_URL[-1] == '/':
|
|
|
|
- OPENAI_URL = OPENAI_URL[:-1]
|
|
|
|
- url = "{0}/v1/chat/completions".format(OPENAI_URL)
|
|
|
|
- data = {
|
|
|
|
- "model": OPENAI_MODEL,
|
|
|
|
- "messages": [
|
|
|
|
- {
|
|
|
|
- "role": "system",
|
|
|
|
- "content": PROMPT
|
|
|
|
- }
|
|
|
|
- ],
|
|
|
|
- "temperature": 0.3
|
|
|
|
- }
|
|
|
|
- for chat in chats:
|
|
|
|
- data['messages'].append({
|
|
|
|
- "role": "user",
|
|
|
|
- "content": chat[0]
|
|
|
|
- })
|
|
|
|
- data['messages'].append({
|
|
|
|
- "role": "assistant",
|
|
|
|
- "content": chat[1]
|
|
|
|
- })
|
|
|
|
- data["messages"].append({
|
|
|
|
- "role": "user",
|
|
|
|
- "content": question
|
|
|
|
- })
|
|
|
|
- payload = json.dumps(data)
|
|
|
|
-
|
|
|
|
- headers = {
|
|
|
|
- 'Content-Type': 'application/json',
|
|
|
|
- 'Authorization': 'Bearer '+OPENAI_API_KEY
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- response = requests.request("POST", url, headers=headers, data=payload)
|
|
|
|
- return response.text
|
|
|
|
-
|
|
|
|
-# 检查端口是否可用
|
|
|
|
-def check_port_available(port):
|
|
|
|
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
- sock.settimeout(1)
|
|
|
|
- result = sock.connect_ex(('localhost', port))
|
|
|
|
- if result == 0:
|
|
|
|
- return True
|
|
|
|
- else:
|
|
|
|
- return False
|
|
|
|
-
|
|
|
|
-# 查询本地
|
|
|
|
-def askLocal(question):
|
|
|
|
- global chats
|
|
|
|
- data = {
|
|
|
|
- "model": "codegemma:2b", # 使用 qwen:7b 模型
|
|
|
|
- "stream": False, # 禁用流式输出
|
|
|
|
- "messages": [
|
|
|
|
-
|
|
|
|
- {
|
|
|
|
- "role": "system",
|
|
|
|
- "content": PROMPT
|
|
|
|
- }
|
|
|
|
-
|
|
|
|
- ],
|
|
|
|
- "temperature": 0.3
|
|
|
|
- }
|
|
|
|
- for chat in chats:
|
|
|
|
- data['messages'].append({
|
|
|
|
- "role": "user",
|
|
|
|
- "content": chat[0]
|
|
|
|
- })
|
|
|
|
- data['messages'].append({
|
|
|
|
- "role": "assistant",
|
|
|
|
- "content": chat[1]
|
|
|
|
- })
|
|
|
|
- data["messages"].append({
|
|
|
|
- "role": "user",
|
|
|
|
- "content": question
|
|
|
|
- })
|
|
|
|
- res = requests.post(
|
|
|
|
- "http://localhost:{0}/api/chat".format(LOCAL_PORT),
|
|
|
|
- json=data,
|
|
|
|
- )
|
|
|
|
- return res.text
|
|
|
|
-
|
|
|
|
-def parse(answer, isLocal):
|
|
|
|
- answer = json.loads(answer)
|
|
|
|
- # 获取第一个结果
|
|
|
|
- if isLocal:
|
|
|
|
- result = answer['message']['content']
|
|
|
|
- else:
|
|
|
|
- if BACKEND=="ernie":
|
|
|
|
- result = answer['result']
|
|
|
|
- elif BACKEND=="openai":
|
|
|
|
- result = answer['choices'][0]['message']['content']
|
|
|
|
-
|
|
|
|
- lines = result.split('\n')
|
|
|
|
- # 获取lines中的```bash项到```项,并拼成一个字符串
|
|
|
|
- cmd = ''
|
|
|
|
- start = False
|
|
|
|
- for line in lines:
|
|
|
|
- if line.strip() == '```bash':
|
|
|
|
- start = True
|
|
|
|
- continue
|
|
|
|
- if start:
|
|
|
|
- if line.strip() == '```':
|
|
|
|
- # 去掉最后的\n
|
|
|
|
- cmd = cmd[:-1]
|
|
|
|
- break
|
|
|
|
- cmd += line+'\n'
|
|
|
|
-
|
|
|
|
- return result, cmd
|
|
|
|
-
|
|
|
|
-def issueModel(question):
|
|
|
|
- global lock, LOCAL_PORT, chats
|
|
|
|
- # 显示转圈
|
|
|
|
- lock = [True, '']
|
|
|
|
- try:
|
|
|
|
- _thread.start_new_thread(loading, (lock,))
|
|
|
|
- except Exception as e:
|
|
|
|
- print(e)
|
|
|
|
- cli = ""
|
|
|
|
- if USE_LOCAL:
|
|
|
|
- # 检查11434端口是否可用
|
|
|
|
- if check_port_available(LOCAL_PORT):
|
|
|
|
- # 本地ollama
|
|
|
|
- lock[1] ="\033[1;32m终端助理正在本地思考...\033[0m"
|
|
|
|
- answer = askLocal(question)
|
|
|
|
- chats.append([question, answer])
|
|
|
|
- result, cli = parse(answer, True)
|
|
|
|
- if cli== "":
|
|
|
|
- lock[1] ="\033[1;32m终端助理正在云端思考...\033[0m"
|
|
|
|
-
|
|
|
|
- # 百度千帆
|
|
|
|
- if BACKEND=="ernie":
|
|
|
|
- query = askERNIE
|
|
|
|
- elif BACKEND=="openai":
|
|
|
|
- query = askOpenAI
|
|
|
|
-
|
|
|
|
- # OpenAI API
|
|
|
|
- answer = query(question)
|
|
|
|
-
|
|
|
|
- result, cli = parse(answer, False)
|
|
|
|
- chats.append([question, result])
|
|
|
|
- lock[0] = False
|
|
|
|
- return cli
|
|
|
|
-
|
|
|
|
-def loading(lock):
|
|
|
|
- chars = ['⣾', '⣷', '⣯', '⣟', '⡿', '⢿', '⣻', '⣽']
|
|
|
|
- i = 0
|
|
|
|
- print('')
|
|
|
|
- while lock[0]:
|
|
|
|
- i = (i+1) % len(chars)
|
|
|
|
- print('\033[A%s %s' %
|
|
|
|
- (chars[i], lock[1] or '' if len(lock) >= 2 else ''))
|
|
|
|
- time.sleep(0.1)
|
|
|
|
-
|
|
|
|
-# 执行alias命令,获取输出结果保存为alias_result
|
|
|
|
-def alias():
|
|
|
|
- # 定义一个dict
|
|
|
|
- alias_result = {'egrep': 'egrep --color=auto'
|
|
|
|
- ,'grep': 'grep --color=auto'
|
|
|
|
- ,'fgrep': 'fgrep --color=auto'
|
|
|
|
- ,'grep': 'grep --color=auto'
|
|
|
|
- ,'l': 'ls -CF'
|
|
|
|
- ,'ll': 'ls -alF'
|
|
|
|
- ,'la': 'ls -A'
|
|
|
|
- ,'ls': 'ls --color=auto'}
|
|
|
|
- return alias_result
|
|
|
|
-
|
|
|
|
-def replaceAlias(cmd):
|
|
|
|
- # 获取alias_result
|
|
|
|
- alias_result = alias()
|
|
|
|
- # 获取cmd中的第一个单词
|
|
|
|
- cmd_first = cmd.split(' ')[0]
|
|
|
|
- # 如果cmd_first在alias_result中,则替换
|
|
|
|
- if cmd_first in alias_result:
|
|
|
|
- cmd = alias_result[cmd_first] + ' ' + cmd.replace(cmd_first, '')
|
|
|
|
- return cmd
|
|
|
|
-
|
|
|
|
-if __name__ == '__main__':
|
|
|
|
- global lock
|
|
|
|
- # 如果没有参数
|
|
|
|
- if len(sys.argv) < 2:
|
|
|
|
- print("Copyright (c) 2024 Xiongwei Yu. Info-Terminal Copilot v1.0 \n\n \
|
|
|
|
-Usage: \n \
|
|
|
|
-erniecli command question : \"?? question\" or \"? question\" for short, quest a command and run\n \
|
|
|
|
-erniecli config : set your config\n \
|
|
|
|
-erniecli alias : show alias\n \
|
|
|
|
-erniecli version : show version")
|
|
|
|
- sys.exit(0)
|
|
|
|
- # 获取第一个参数
|
|
|
|
- cmd = sys.argv[1]
|
|
|
|
- if cmd == "config":
|
|
|
|
- set_config()
|
|
|
|
- # 设置??别名
|
|
|
|
- if cmd == "alias":
|
|
|
|
- print ("alias erniecli='erniecli.py'")
|
|
|
|
- print ("alias ??='erniecli.py command'")
|
|
|
|
- print ("alias ??='erniecli.py command'")
|
|
|
|
- print ("alias ?='erniecli.py command'")
|
|
|
|
- print ("alias ?='erniecli.py command'")
|
|
|
|
- # 显示版本信息
|
|
|
|
- if cmd == "version":
|
|
|
|
- # 紫色显示
|
|
|
|
- print("\033[1;95m终端助理 Version 0.1\n\033[0m")
|
|
|
|
- # 用绿色字体显示“基于文心一言的对话式命令行助理”
|
|
|
|
- print("\033[1;32m基于大语言模型的对话式终端助理\n可使用百度千帆文心大模型ERNIE-3.5-8K或其他OpenAI接口的大语言模型\n让命令行插上AI的翅膀🪽\033[0m")
|
|
|
|
- sys.exit(0)
|
|
|
|
- # 如果cmd为command,调用ask函数
|
|
|
|
- if cmd == "command":
|
|
|
|
- get_config()
|
|
|
|
- # 获取第二个参数
|
|
|
|
- # 如果第二个参数为空,则输出错误,用红色字体显示
|
|
|
|
- if len(sys.argv) < 3:
|
|
|
|
- print("\033[1;31m请输入你的意图\033[0m")
|
|
|
|
- sys.exit(0)
|
|
|
|
- # 获取后面的所有参数,并拼接成字符串
|
|
|
|
- question = ' '.join(sys.argv[2:])
|
|
|
|
- # question = sys.argv[2]
|
|
|
|
- # 如果question为空,则输出错误,用红色字体显示
|
|
|
|
- if question == "":
|
|
|
|
- print("\033[1;31m请输入你的意图\033[0m")
|
|
|
|
- sys.exit(0)
|
|
|
|
- # 调用ask函数,并输出结果
|
|
|
|
-
|
|
|
|
- #使用绿色字体
|
|
|
|
- cli = ""
|
|
|
|
-
|
|
|
|
- while True:
|
|
|
|
- cli = issueModel(question)
|
|
|
|
- if cli == "":
|
|
|
|
- question = input("\033[A\033[2K\033[1;33mAI没有找到可执行的命令[\x1B[4m\033[37mA\x1B[0m\033[1;33mbort/您的需求] \033[0m")
|
|
|
|
- if question == "A" or question == "a" or question == "":
|
|
|
|
- print('已取消')
|
|
|
|
- sys.exit(0)
|
|
|
|
- else:
|
|
|
|
- continue
|
|
|
|
- print('\033[F\033[K',end = "\033[1;32m❯ \033[0m")
|
|
|
|
- print('\033[1;32m{0}\033[0m'.format(cli))
|
|
|
|
-
|
|
|
|
- question = input('\033[1;32m? \033[0m\033[1;90m是否执行 ⬆ ? [\x1B[4m\033[37mC\x1B[0m\033[1;90monfirm/\x1B[4m\033[37ma\x1B[0m\033[1;90mbort/您的需求] \033[0m')
|
|
|
|
- if question == 'C' or question == 'c' or question == '':
|
|
|
|
- sys.stdout.write('\033[A\r')
|
|
|
|
- sys.stdout.flush()
|
|
|
|
- sys.stdout.write('\033[K')
|
|
|
|
- sys.stdout.flush()
|
|
|
|
- # 执行命令,并输出结果
|
|
|
|
- # print('')
|
|
|
|
- cmd = replaceAlias(cli)
|
|
|
|
- subprocess.run(cli, shell=True)
|
|
|
|
- sys.exit(0)
|
|
|
|
- elif question == 'A' or question == 'a':
|
|
|
|
- print('已取消')
|
|
|
|
- sys.exit(0)
|
|
|
|
- else:
|
|
|
|
- continue
|
|
|
|
-
|
|
|
|
-
|
|
|
|
|
|
+#!/usr/bin/python3
|
|
|
|
+import requests
|
|
|
|
+import json
|
|
|
|
+import sys
|
|
|
|
+import time
|
|
|
|
+import _thread
|
|
|
|
+import os
|
|
|
|
+import subprocess
|
|
|
|
+import socket
|
|
|
|
+
|
|
|
|
+ERNIE_API_KEY = ""
|
|
|
|
+ERNIE_SECRET_KEY = ""
|
|
|
|
+
|
|
|
|
+OPENAI_URL = ""
|
|
|
|
+OPENAI_API_KEY = ""
|
|
|
|
+OPENAI_MODEL = ""
|
|
|
|
+
|
|
|
|
+# 后端可选“openai”或“ernie”
|
|
|
|
+BACKEND = ""
|
|
|
|
+
|
|
|
|
+# 是否启用本地模式
|
|
|
|
+USE_LOCAL = False
|
|
|
|
+LOCAL_PORT = 11434
|
|
|
|
+
|
|
|
|
+PROMPT2 = """
|
|
|
|
+你是一个Linux终端,请将我的请求转换成一条最简洁的bash命令。
|
|
|
|
+只给一条命令,不要做任何解释或说明。
|
|
|
|
+
|
|
|
|
+示例:
|
|
|
|
+请求:显示系统版本信息。
|
|
|
|
+输出:
|
|
|
|
+```bash
|
|
|
|
+uname -a
|
|
|
|
+```
|
|
|
|
+"""
|
|
|
|
+
|
|
|
|
+PROMPT = """
|
|
|
|
+你是一个Linux Bash终端,请将我的请求转化为一句bash指令,
|
|
|
|
+输出格式为
|
|
|
|
+```bash
|
|
|
|
+指令
|
|
|
|
+```
|
|
|
|
+输出尽量简洁,如果有多个指令,请把它们合并成一个输出
|
|
|
|
+"""
|
|
|
|
+chats = []
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+
|
|
|
|
+# 设置API_KEY和SECRET_KEY
|
|
|
|
+def set_config():
|
|
|
|
+ global ERNIE_API_KEY, ERNIE_SECRET_KEY, OPENAI_API_KEY, OPENAI_MODEL, OPENAI_URL, BACKEND
|
|
|
|
+ if os.path.exists(os.path.expanduser('~/.config/erniecli/.ernierc')):
|
|
|
|
+ get_config()
|
|
|
|
+ elif not os.path.exists(os.path.expanduser('~/.config/erniecli')):
|
|
|
|
+ os.makedirs(os.path.expanduser('~/.config/erniecli'))
|
|
|
|
+ # 用黄色字体
|
|
|
|
+ bidx = 0
|
|
|
|
+ if BACKEND == "ernie":
|
|
|
|
+ bidx = 1
|
|
|
|
+ elif BACKEND == "openai":
|
|
|
|
+ bidx = 2
|
|
|
|
+ elif BACKEND == "":
|
|
|
|
+ bidx = 0
|
|
|
|
+
|
|
|
|
+ while True:
|
|
|
|
+ choice_bidx = input("\033[1;33m请选择后端(0: None | 1: ernie | 2: openai)[current: {0}]:".format(bidx))
|
|
|
|
+ if choice_bidx == '':
|
|
|
|
+ bidx = bidx
|
|
|
|
+ else:
|
|
|
|
+ bidx = int(choice_bidx)
|
|
|
|
+ if bidx == 1 or bidx == 2:
|
|
|
|
+ break
|
|
|
|
+ if bidx == 1:
|
|
|
|
+ BACKEND = "ernie"
|
|
|
|
+ elif bidx == 2:
|
|
|
|
+ BACKEND = "openai"
|
|
|
|
+
|
|
|
|
+ choose_local = input("\033[1;35m是否需要配置本地模式?(y/N)")
|
|
|
|
+ if choose_local == "y":
|
|
|
|
+ USE_LOCAL = True
|
|
|
|
+ else:
|
|
|
|
+ USE_LOCAL = False
|
|
|
|
+
|
|
|
|
+ choice_ernie = input("\033[1;34m是否需要配置ernie?(y/N)")
|
|
|
|
+ if choice_ernie == "y":
|
|
|
|
+ apikey_value = input("\033[1;34m请输入API_KEY(当前值:"+ERNIE_API_KEY+"):")
|
|
|
|
+ securekey_value = input("请输入SECRET_KEY(当前值"+ERNIE_SECRET_KEY+"):")
|
|
|
|
+ if apikey_value != "":
|
|
|
|
+ ERNIE_API_KEY = apikey_value.strip()
|
|
|
|
+ if securekey_value != "":
|
|
|
|
+ ERNIE_SECRET_KEY = securekey_value.strip()
|
|
|
|
+
|
|
|
|
+ choice_openai = input("\033[1;36m是否需要配置openai?(y/N)")
|
|
|
|
+ if choice_openai == "y":
|
|
|
|
+ url_value = input("\033[1;36m请输入BASE URL(当前值:"+OPENAI_URL+"):")
|
|
|
|
+ apikey_value = input("\033[1;36m请输入API_KEY(当前值:"+OPENAI_API_KEY+"):")
|
|
|
|
+ model_value = input("请输入模型(当前值:"+OPENAI_MODEL+"):")
|
|
|
|
+ if url_value != "":
|
|
|
|
+ OPENAI_URL = url_value.strip()
|
|
|
|
+ if apikey_value != "":
|
|
|
|
+ OPENAI_API_KEY = apikey_value.strip()
|
|
|
|
+ if model_value != "":
|
|
|
|
+ OPENAI_MODEL = model_value.strip()
|
|
|
|
+
|
|
|
|
+ with open(os.path.expanduser('~/.config/erniecli/.ernierc'), 'w', encoding='utf-8') as f:
|
|
|
|
+ # 写入所有配置
|
|
|
|
+ f.write("[GLOBAL]\n")
|
|
|
|
+ f.write("BACKEND="+BACKEND+"\n")
|
|
|
|
+ f.write("LOCAL="+str(USE_LOCAL)+"\n")
|
|
|
|
+ f.write("\n[ERNIE]\n")
|
|
|
|
+ f.write("API_KEY="+ERNIE_API_KEY+"\n")
|
|
|
|
+ f.write("SECRET_KEY="+ERNIE_SECRET_KEY+"\n")
|
|
|
|
+ f.write("\n[OPENAI]\n")
|
|
|
|
+ f.write("URL="+OPENAI_URL+"\n")
|
|
|
|
+ f.write("API_KEY="+OPENAI_API_KEY+"\n")
|
|
|
|
+ f.write("MODEL="+OPENAI_MODEL+"\n")
|
|
|
|
+ print("\033[1;32m配置成功\033[0m")
|
|
|
|
+ sys.exit(0)
|
|
|
|
+
|
|
|
|
+# 读取$HOME/.config/erniecli/.ernierc文件中API_KEY和SECRET_KEY
|
|
|
|
+def get_config():
|
|
|
|
+ global ERNIE_API_KEY, ERNIE_SECRET_KEY, OPENAI_API_KEY, OPENAI_MODEL, OPENAI_URL, BACKEND, USE_LOCAL
|
|
|
|
+ config = os.path.expanduser('~/.config/erniecli/.ernierc')
|
|
|
|
+ if not os.path.exists(config):
|
|
|
|
+ print("\033[1;31m请进行使用erniecli进行配置\033[0m")
|
|
|
|
+ sys.exit(0)
|
|
|
|
+ # 读取配置文件,读取[global]的BACKEND
|
|
|
|
+ group = "global"
|
|
|
|
+ with open(config, 'r', encoding='utf-8') as f:
|
|
|
|
+ for line in f.readlines():
|
|
|
|
+ line = line.strip()
|
|
|
|
+ if len(line) == 0 or line[0] == '#':
|
|
|
|
+ continue
|
|
|
|
+
|
|
|
|
+ elif line.startswith("["):
|
|
|
|
+ group = line[1:-1]
|
|
|
|
+ continue
|
|
|
|
+ # 配置global
|
|
|
|
+ if group == "GLOBAL":
|
|
|
|
+ key, value = line.split('=')
|
|
|
|
+ if key.strip() == "BACKEND":
|
|
|
|
+ BACKEND = value.strip()
|
|
|
|
+ if key.strip() == "LOCAL":
|
|
|
|
+ USE_LOCAL = value.strip() == "True"
|
|
|
|
+ if group == "ERNIE":
|
|
|
|
+ key, value = line.split('=')
|
|
|
|
+ if key.strip() == "API_KEY":
|
|
|
|
+ ERNIE_API_KEY = value.strip()
|
|
|
|
+ if key.strip() == "SECRET_KEY":
|
|
|
|
+ ERNIE_SECRET_KEY = value.strip()
|
|
|
|
+ if group == "OPENAI":
|
|
|
|
+ key, value = line.split('=')
|
|
|
|
+ if key.strip() == "API_KEY":
|
|
|
|
+ OPENAI_API_KEY = value.strip()
|
|
|
|
+ if key.strip() == "MODEL":
|
|
|
|
+ OPENAI_MODEL = value.strip()
|
|
|
|
+ if key.strip() == "URL":
|
|
|
|
+ OPENAI_URL = value.strip()
|
|
|
|
+
|
|
|
|
+# 查询百度千帆
|
|
|
|
+def askERNIE(question):
|
|
|
|
+ global chats
|
|
|
|
+ url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/ernie-speed-128k?access_token=" + get_access_token()
|
|
|
|
+
|
|
|
|
+ data = {
|
|
|
|
+ "messages": [
|
|
|
|
+ ],
|
|
|
|
+ "temperature": 0.95,
|
|
|
|
+ "top_p": 0.8,
|
|
|
|
+ "penalty_score": 1,
|
|
|
|
+ "disable_search": False,
|
|
|
|
+ "enable_citation": False,
|
|
|
|
+ "response_format": "text"
|
|
|
|
+ }
|
|
|
|
+ index = 0
|
|
|
|
+ for chat in chats:
|
|
|
|
+ quest = chat[0]
|
|
|
|
+ if index == 0:
|
|
|
|
+ quest = PROMPT+"我的问题是:{0}".format(quest)
|
|
|
|
+ index = index + 1
|
|
|
|
+ data['messages'].append({
|
|
|
|
+ "role": "user",
|
|
|
|
+ "content": quest
|
|
|
|
+ })
|
|
|
|
+ data['messages'].append({
|
|
|
|
+ "role": "assistant",
|
|
|
|
+ "content": chat[1]
|
|
|
|
+ })
|
|
|
|
+ if index == 0:
|
|
|
|
+ question = PROMPT+"我的问题是:{0}".format(question)
|
|
|
|
+ data['messages'].append({
|
|
|
|
+ "role": "user",
|
|
|
|
+ "content": question
|
|
|
|
+ })
|
|
|
|
+ payload = json.dumps(data)
|
|
|
|
+ headers = {
|
|
|
|
+ 'Content-Type': 'application/json'
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ response = requests.request("POST", url, headers=headers, data=payload)
|
|
|
|
+
|
|
|
|
+ return response.text
|
|
|
|
+
|
|
|
|
+def get_access_token():
|
|
|
|
+ """
|
|
|
|
+ 使用 AK,SK 生成鉴权签名(Access Token)
|
|
|
|
+ :return: access_token,或是None(如果错误)
|
|
|
|
+ """
|
|
|
|
+ url = "https://aip.baidubce.com/oauth/2.0/token"
|
|
|
|
+ params = {"grant_type": "client_credentials", "client_id": ERNIE_API_KEY, "client_secret": ERNIE_SECRET_KEY}
|
|
|
|
+ return str(requests.post(url, params=params).json().get("access_token"))
|
|
|
|
+
|
|
|
|
+# 查询OpenAI接口,如赞同模型
|
|
|
|
+def askOpenAI(question):
|
|
|
|
+ global OPENAI_URL, chats
|
|
|
|
+ # 如果OPENAI_URL是/结尾,去掉最后的/
|
|
|
|
+ if OPENAI_URL[-1] == '/':
|
|
|
|
+ OPENAI_URL = OPENAI_URL[:-1]
|
|
|
|
+ url = "{0}/v1/chat/completions".format(OPENAI_URL)
|
|
|
|
+ data = {
|
|
|
|
+ "model": OPENAI_MODEL,
|
|
|
|
+ "messages": [
|
|
|
|
+ {
|
|
|
|
+ "role": "system",
|
|
|
|
+ "content": PROMPT
|
|
|
|
+ }
|
|
|
|
+ ],
|
|
|
|
+ "temperature": 0.3
|
|
|
|
+ }
|
|
|
|
+ for chat in chats:
|
|
|
|
+ data['messages'].append({
|
|
|
|
+ "role": "user",
|
|
|
|
+ "content": chat[0]
|
|
|
|
+ })
|
|
|
|
+ data['messages'].append({
|
|
|
|
+ "role": "assistant",
|
|
|
|
+ "content": chat[1]
|
|
|
|
+ })
|
|
|
|
+ data["messages"].append({
|
|
|
|
+ "role": "user",
|
|
|
|
+ "content": question
|
|
|
|
+ })
|
|
|
|
+ payload = json.dumps(data)
|
|
|
|
+
|
|
|
|
+ headers = {
|
|
|
|
+ 'Content-Type': 'application/json',
|
|
|
|
+ 'Authorization': 'Bearer '+OPENAI_API_KEY
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ response = requests.request("POST", url, headers=headers, data=payload)
|
|
|
|
+ return response.text
|
|
|
|
+
|
|
|
|
+# 检查端口是否可用
|
|
|
|
+def check_port_available(port):
|
|
|
|
+ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
|
|
|
+ sock.settimeout(1)
|
|
|
|
+ result = sock.connect_ex(('localhost', port))
|
|
|
|
+ if result == 0:
|
|
|
|
+ return True
|
|
|
|
+ else:
|
|
|
|
+ return False
|
|
|
|
+
|
|
|
|
+# 查询本地
|
|
|
|
+def askLocal(question):
|
|
|
|
+ global chats
|
|
|
|
+ data = {
|
|
|
|
+ "model": "codegemma:2b", # 使用 qwen:7b 模型
|
|
|
|
+ "stream": False, # 禁用流式输出
|
|
|
|
+ "messages": [
|
|
|
|
+
|
|
|
|
+ {
|
|
|
|
+ "role": "system",
|
|
|
|
+ "content": PROMPT
|
|
|
|
+ }
|
|
|
|
+
|
|
|
|
+ ],
|
|
|
|
+ "temperature": 0.3
|
|
|
|
+ }
|
|
|
|
+ for chat in chats:
|
|
|
|
+ data['messages'].append({
|
|
|
|
+ "role": "user",
|
|
|
|
+ "content": chat[0]
|
|
|
|
+ })
|
|
|
|
+ data['messages'].append({
|
|
|
|
+ "role": "assistant",
|
|
|
|
+ "content": chat[1]
|
|
|
|
+ })
|
|
|
|
+ data["messages"].append({
|
|
|
|
+ "role": "user",
|
|
|
|
+ "content": question
|
|
|
|
+ })
|
|
|
|
+ res = requests.post(
|
|
|
|
+ "http://localhost:{0}/api/chat".format(LOCAL_PORT),
|
|
|
|
+ json=data,
|
|
|
|
+ )
|
|
|
|
+ return res.text
|
|
|
|
+
|
|
|
|
+def parse(answer, isLocal):
|
|
|
|
+ answer = json.loads(answer)
|
|
|
|
+ # 获取第一个结果
|
|
|
|
+ if isLocal:
|
|
|
|
+ result = answer['message']['content']
|
|
|
|
+ else:
|
|
|
|
+ if BACKEND=="ernie":
|
|
|
|
+ result = answer['result']
|
|
|
|
+ elif BACKEND=="openai":
|
|
|
|
+ result = answer['choices'][0]['message']['content']
|
|
|
|
+
|
|
|
|
+ lines = result.split('\n')
|
|
|
|
+ # 获取lines中的```bash项到```项,并拼成一个字符串
|
|
|
|
+ cmd = ''
|
|
|
|
+ start = False
|
|
|
|
+ for line in lines:
|
|
|
|
+ if line.strip() == '```bash':
|
|
|
|
+ start = True
|
|
|
|
+ continue
|
|
|
|
+ if start:
|
|
|
|
+ if line.strip() == '```':
|
|
|
|
+ # 去掉最后的\n
|
|
|
|
+ cmd = cmd[:-1]
|
|
|
|
+ break
|
|
|
|
+ cmd += line+'\n'
|
|
|
|
+
|
|
|
|
+ return result, cmd
|
|
|
|
+
|
|
|
|
+def issueModel(question):
|
|
|
|
+ global lock, LOCAL_PORT, chats
|
|
|
|
+ # 显示转圈
|
|
|
|
+ lock = [True, '']
|
|
|
|
+ try:
|
|
|
|
+ _thread.start_new_thread(loading, (lock,))
|
|
|
|
+ except Exception as e:
|
|
|
|
+ print(e)
|
|
|
|
+ cli = ""
|
|
|
|
+ if USE_LOCAL:
|
|
|
|
+ # 检查11434端口是否可用
|
|
|
|
+ if check_port_available(LOCAL_PORT):
|
|
|
|
+ # 本地ollama
|
|
|
|
+ lock[1] ="\033[1;32m终端助理正在本地思考...\033[0m"
|
|
|
|
+ answer = askLocal(question)
|
|
|
|
+ chats.append([question, answer])
|
|
|
|
+ result, cli = parse(answer, True)
|
|
|
|
+ if cli== "":
|
|
|
|
+ lock[1] ="\033[1;32m终端助理正在云端思考...\033[0m"
|
|
|
|
+
|
|
|
|
+ # 百度千帆
|
|
|
|
+ if BACKEND=="ernie":
|
|
|
|
+ query = askERNIE
|
|
|
|
+ elif BACKEND=="openai":
|
|
|
|
+ query = askOpenAI
|
|
|
|
+
|
|
|
|
+ # OpenAI API
|
|
|
|
+ answer = query(question)
|
|
|
|
+
|
|
|
|
+ result, cli = parse(answer, False)
|
|
|
|
+ chats.append([question, result])
|
|
|
|
+ lock[0] = False
|
|
|
|
+ return cli
|
|
|
|
+
|
|
|
|
+def loading(lock):
|
|
|
|
+ chars = ['⣾', '⣷', '⣯', '⣟', '⡿', '⢿', '⣻', '⣽']
|
|
|
|
+ i = 0
|
|
|
|
+ print('')
|
|
|
|
+ while lock[0]:
|
|
|
|
+ i = (i+1) % len(chars)
|
|
|
|
+ print('\033[A%s %s' %
|
|
|
|
+ (chars[i], lock[1] or '' if len(lock) >= 2 else ''))
|
|
|
|
+ time.sleep(0.1)
|
|
|
|
+
|
|
|
|
+# 执行alias命令,获取输出结果保存为alias_result
|
|
|
|
+def alias():
|
|
|
|
+ # 定义一个dict
|
|
|
|
+ alias_result = {'egrep': 'egrep --color=auto'
|
|
|
|
+ ,'grep': 'grep --color=auto'
|
|
|
|
+ ,'fgrep': 'fgrep --color=auto'
|
|
|
|
+ ,'grep': 'grep --color=auto'
|
|
|
|
+ ,'l': 'ls -CF'
|
|
|
|
+ ,'ll': 'ls -alF'
|
|
|
|
+ ,'la': 'ls -A'
|
|
|
|
+ ,'ls': 'ls --color=auto'}
|
|
|
|
+ return alias_result
|
|
|
|
+
|
|
|
|
+def replaceAlias(cmd):
|
|
|
|
+ # 获取alias_result
|
|
|
|
+ alias_result = alias()
|
|
|
|
+ # 获取cmd中的第一个单词
|
|
|
|
+ cmd_first = cmd.split(' ')[0]
|
|
|
|
+ # 如果cmd_first在alias_result中,则替换
|
|
|
|
+ if cmd_first in alias_result:
|
|
|
|
+ cmd = alias_result[cmd_first] + ' ' + cmd.replace(cmd_first, '')
|
|
|
|
+ return cmd
|
|
|
|
+
|
|
|
|
+if __name__ == '__main__':
|
|
|
|
+ global lock
|
|
|
|
+ # 如果没有参数
|
|
|
|
+ if len(sys.argv) < 2:
|
|
|
|
+ print("Copyright (c) 2024 Xiongwei Yu. Info-Terminal Copilot v1.0 \n\n \
|
|
|
|
+Usage: \n \
|
|
|
|
+erniecli command question : \"?? question\" or \"? question\" for short, quest a command and run\n \
|
|
|
|
+erniecli config : set your config\n \
|
|
|
|
+erniecli alias : show alias\n \
|
|
|
|
+erniecli version : show version")
|
|
|
|
+ sys.exit(0)
|
|
|
|
+ # 获取第一个参数
|
|
|
|
+ cmd = sys.argv[1]
|
|
|
|
+ if cmd == "config":
|
|
|
|
+ set_config()
|
|
|
|
+ # 设置??别名
|
|
|
|
+ if cmd == "alias":
|
|
|
|
+ print ("alias erniecli='erniecli.py'")
|
|
|
|
+ print ("alias ??='erniecli.py command'")
|
|
|
|
+ print ("alias ??='erniecli.py command'")
|
|
|
|
+ print ("alias ?='erniecli.py command'")
|
|
|
|
+ print ("alias ?='erniecli.py command'")
|
|
|
|
+ # 显示版本信息
|
|
|
|
+ if cmd == "version":
|
|
|
|
+ # 紫色显示
|
|
|
|
+ print("\033[1;95m终端助理 Version 0.1\n\033[0m")
|
|
|
|
+ # 用绿色字体显示“基于文心一言的对话式命令行助理”
|
|
|
|
+ print("\033[1;32m基于大语言模型的对话式终端助理\n可使用百度千帆文心大模型ERNIE-3.5-8K或其他OpenAI接口的大语言模型\n让命令行插上AI的翅膀🪽\033[0m")
|
|
|
|
+ sys.exit(0)
|
|
|
|
+ # 如果cmd为command,调用ask函数
|
|
|
|
+ if cmd == "command":
|
|
|
|
+ get_config()
|
|
|
|
+ # 获取第二个参数
|
|
|
|
+ # 如果第二个参数为空,则输出错误,用红色字体显示
|
|
|
|
+ if len(sys.argv) < 3:
|
|
|
|
+ print("\033[1;31m请输入你的意图\033[0m")
|
|
|
|
+ sys.exit(0)
|
|
|
|
+ # 获取后面的所有参数,并拼接成字符串
|
|
|
|
+ question = ' '.join(sys.argv[2:])
|
|
|
|
+ # question = sys.argv[2]
|
|
|
|
+ # 如果question为空,则输出错误,用红色字体显示
|
|
|
|
+ if question == "":
|
|
|
|
+ print("\033[1;31m请输入你的意图\033[0m")
|
|
|
|
+ sys.exit(0)
|
|
|
|
+ # 调用ask函数,并输出结果
|
|
|
|
+
|
|
|
|
+ #使用绿色字体
|
|
|
|
+ cli = ""
|
|
|
|
+
|
|
|
|
+ while True:
|
|
|
|
+ cli = issueModel(question)
|
|
|
|
+ if cli == "":
|
|
|
|
+ question = input("\033[A\033[2K\033[1;33mAI没有找到可执行的命令[\x1B[4m\033[37mA\x1B[0m\033[1;33mbort/您的需求] \033[0m")
|
|
|
|
+ if question == "A" or question == "a" or question == "":
|
|
|
|
+ print('已取消')
|
|
|
|
+ sys.exit(0)
|
|
|
|
+ else:
|
|
|
|
+ continue
|
|
|
|
+ print('\033[F\033[K',end = "\033[1;32m❯ \033[0m")
|
|
|
|
+ print('\033[1;32m{0}\033[0m'.format(cli))
|
|
|
|
+
|
|
|
|
+ question = input('\033[1;32m? \033[0m\033[1;90m是否执行 ⬆ ? [\x1B[4m\033[37mC\x1B[0m\033[1;90monfirm/\x1B[4m\033[37ma\x1B[0m\033[1;90mbort/您的需求] \033[0m')
|
|
|
|
+ if question == 'C' or question == 'c' or question == '':
|
|
|
|
+ sys.stdout.write('\033[A\r')
|
|
|
|
+ sys.stdout.flush()
|
|
|
|
+ sys.stdout.write('\033[K')
|
|
|
|
+ sys.stdout.flush()
|
|
|
|
+ # 执行命令,并输出结果
|
|
|
|
+ # print('')
|
|
|
|
+ cmd = replaceAlias(cli)
|
|
|
|
+ subprocess.run(cli, shell=True)
|
|
|
|
+ sys.exit(0)
|
|
|
|
+ elif question == 'A' or question == 'a':
|
|
|
|
+ print('已取消')
|
|
|
|
+ sys.exit(0)
|
|
|
|
+ else:
|
|
|
|
+ continue
|
|
|
|
+
|
|
|
|
+
|