|
- #!/usr/bin/python3
- import requests
- import json
- import sys
- import time
- import _thread
- import os
- import subprocess
- import socket
- ERNIE_API_KEY = ""
- ERNIE_SECRET_KEY = ""
- OPENAI_URL = ""
- OPENAI_API_KEY = ""
- OPENAI_MODEL = ""
- # 后端可选“openai”或“ernie”
- BACKEND = ""
- # 是否启用本地模式
- USE_LOCAL = False
- LOCAL_PORT = 11434
- PROMPT = """
- 你是一个Linux命令行专家,请将我的请求转换成一条可执行的bash命令。
- 只给一条命令,不要做任何解释或说明。
- 示例:
- 请求:显示系统版本信息。
- 输出:
- ```bash
- uname -a
- ```
- """
- chats = []
- # 设置API_KEY和SECRET_KEY
- def set_config():
- global ERNIE_API_KEY, ERNIE_SECRET_KEY, OPENAI_API_KEY, OPENAI_MODEL, OPENAI_URL, BACKEND
- if os.path.exists(os.path.expanduser('~/.config/erniecli/.ernierc')):
- get_config()
- elif not os.path.exists(os.path.expanduser('~/.config/erniecli')):
- os.makedirs(os.path.expanduser('~/.config/erniecli'))
- # 用黄色字体
- bidx = 0
- if BACKEND == "ernie":
- bidx = 1
- elif BACKEND == "openai":
- bidx = 2
- elif BACKEND == "":
- bidx = 0
-
- while True:
- choice_bidx = input("\033[1;33m请选择后端(0: None | 1: ernie | 2: openai)[current: {0}]:".format(bidx))
- if choice_bidx == '':
- bidx = bidx
- else:
- bidx = int(choice_bidx)
- if bidx == 1 or bidx == 2:
- break
- if bidx == 1:
- BACKEND = "ernie"
- elif bidx == 2:
- BACKEND = "openai"
-
- choose_local = input("\033[1;35m是否需要配置本地模式?(y/N)")
- if choose_local == "y":
- USE_LOCAL = True
- else:
- USE_LOCAL = False
- choice_ernie = input("\033[1;34m是否需要配置ernie?(y/N)")
- if choice_ernie == "y":
- apikey_value = input("\033[1;34m请输入API_KEY(当前值:"+ERNIE_API_KEY+"):")
- securekey_value = input("请输入SECRET_KEY(当前值"+ERNIE_SECRET_KEY+"):")
- if apikey_value != "":
- ERNIE_API_KEY = apikey_value.strip()
- if securekey_value != "":
- ERNIE_SECRET_KEY = securekey_value.strip()
- choice_openai = input("\033[1;36m是否需要配置openai?(y/N)")
- if choice_openai == "y":
- url_value = input("\033[1;36m请输入BASE URL(当前值:"+OPENAI_URL+"):")
- apikey_value = input("\033[1;36m请输入API_KEY(当前值:"+OPENAI_API_KEY+"):")
- model_value = input("请输入模型(当前值:"+OPENAI_MODEL+"):")
- if url_value != "":
- OPENAI_URL = url_value.strip()
- if apikey_value != "":
- OPENAI_API_KEY = apikey_value.strip()
- if model_value != "":
- OPENAI_MODEL = model_value.strip()
-
- with open(os.path.expanduser('~/.config/erniecli/.ernierc'), 'w', encoding='utf-8') as f:
- # 写入所有配置
- f.write("[GLOBAL]\n")
- f.write("BACKEND="+BACKEND+"\n")
- f.write("LOCAL="+str(USE_LOCAL)+"\n")
- f.write("\n[ERNIE]\n")
- f.write("API_KEY="+ERNIE_API_KEY+"\n")
- f.write("SECRET_KEY="+ERNIE_SECRET_KEY+"\n")
- f.write("\n[OPENAI]\n")
- f.write("URL="+OPENAI_URL+"\n")
- f.write("API_KEY="+OPENAI_API_KEY+"\n")
- f.write("MODEL="+OPENAI_MODEL+"\n")
- print("\033[1;32m配置成功\033[0m")
- sys.exit(0)
- # 读取$HOME/.config/erniecli/.ernierc文件中API_KEY和SECRET_KEY
- def get_config():
- global ERNIE_API_KEY, ERNIE_SECRET_KEY, OPENAI_API_KEY, OPENAI_MODEL, OPENAI_URL, BACKEND, USE_LOCAL
- config = os.path.expanduser('~/.config/erniecli/.ernierc')
- if not os.path.exists(config):
- print("\033[1;31m请进行使用erniecli进行配置\033[0m")
- sys.exit(0)
- # 读取配置文件,读取[global]的BACKEND
- group = "global"
- with open(config, 'r', encoding='utf-8') as f:
- for line in f.readlines():
- line = line.strip()
- if len(line) == 0 or line[0] == '#':
- continue
- elif line.startswith("["):
- group = line[1:-1]
- continue
- # 配置global
- if group == "GLOBAL":
- key, value = line.split('=')
- if key.strip() == "BACKEND":
- BACKEND = value.strip()
- if key.strip() == "LOCAL":
- USE_LOCAL = value.strip() == "True"
- if group == "ERNIE":
- key, value = line.split('=')
- if key.strip() == "API_KEY":
- ERNIE_API_KEY = value.strip()
- if key.strip() == "SECRET_KEY":
- ERNIE_SECRET_KEY = value.strip()
- if group == "OPENAI":
- key, value = line.split('=')
- if key.strip() == "API_KEY":
- OPENAI_API_KEY = value.strip()
- if key.strip() == "MODEL":
- OPENAI_MODEL = value.strip()
- if key.strip() == "URL":
- OPENAI_URL = value.strip()
- # 查询百度千帆
- def askERNIE(question):
- global chats
- url = "https://aip.baidubce.com/rpc/2.0/ai_custom/v1/wenxinworkshop/chat/completions?access_token=" + get_access_token()
-
- data = {
- "messages": [
- ],
- "temperature": 0.95,
- "top_p": 0.8,
- "penalty_score": 1,
- "disable_search": False,
- "enable_citation": False,
- "response_format": "text"
- }
- index = 0
- for chat in chats:
- quest = chat[0]
- if index == 0:
- quest = PROMPT+"我的问题是:{0}".format(quest)
- index = index + 1
- data['messages'].append({
- "role": "user",
- "content": quest
- })
- data['messages'].append({
- "role": "assistant",
- "content": chat[1]
- })
- if index == 0:
- question = PROMPT+"我的问题是:{0}".format(question)
- data['messages'].append({
- "role": "user",
- "content": question
- })
- payload = json.dumps(data)
- headers = {
- 'Content-Type': 'application/json'
- }
-
- response = requests.request("POST", url, headers=headers, data=payload)
-
- return response.text
- def get_access_token():
- """
- 使用 AK,SK 生成鉴权签名(Access Token)
- :return: access_token,或是None(如果错误)
- """
- url = "https://aip.baidubce.com/oauth/2.0/token"
- params = {"grant_type": "client_credentials", "client_id": ERNIE_API_KEY, "client_secret": ERNIE_SECRET_KEY}
- return str(requests.post(url, params=params).json().get("access_token"))
- # 查询OpenAI接口,如赞同模型
- def askOpenAI(question):
- global OPENAI_URL, chats
- # 如果OPENAI_URL是/结尾,去掉最后的/
- if OPENAI_URL[-1] == '/':
- OPENAI_URL = OPENAI_URL[:-1]
- url = "{0}/v1/chat/completions".format(OPENAI_URL)
- data = {
- "model": OPENAI_MODEL,
- "messages": [
- {
- "role": "system",
- "content": PROMPT
- }
- ],
- "temperature": 0.3
- }
- for chat in chats:
- data['messages'].append({
- "role": "user",
- "content": chat[0]
- })
- data['messages'].append({
- "role": "assistant",
- "content": chat[1]
- })
- data["messages"].append({
- "role": "user",
- "content": question
- })
- payload = json.dumps(data)
-
- headers = {
- 'Content-Type': 'application/json',
- 'Authorization': 'Bearer '+OPENAI_API_KEY
- }
-
- response = requests.request("POST", url, headers=headers, data=payload)
- return response.text
- # 检查端口是否可用
- def check_port_available(port):
- sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
- sock.settimeout(1)
- result = sock.connect_ex(('localhost', port))
- if result == 0:
- return True
- else:
- return False
- # 查询本地
- def askLocal(question):
- global chats
- data = {
- "model": "qwen:7b", # 使用 qwen:7b 模型
- "stream": False, # 禁用流式输出
- "messages": [
-
- {
- "role": "system",
- "content": PROMPT
- }
-
- ],
- "temperature": 0.3
- }
- for chat in chats:
- data['messages'].append({
- "role": "user",
- "content": chat[0]
- })
- data['messages'].append({
- "role": "assistant",
- "content": chat[1]
- })
- data["messages"].append({
- "role": "user",
- "content": question
- })
- res = requests.post(
- "http://localhost:{0}/api/chat".format(LOCAL_PORT),
- json=data,
- )
- return res.text
- def parse(answer, isLocal):
- answer = json.loads(answer)
- # 获取第一个结果
- if isLocal:
- result = answer['message']['content']
- else:
- if BACKEND=="ernie":
- result = answer['result']
- elif BACKEND=="openai":
- result = answer['choices'][0]['message']['content']
-
- lines = result.split('\n')
- # 获取lines中的```bash项到```项,并拼成一个字符串
- cmd = ''
- start = False
- for line in lines:
- if line.strip() == '```bash':
- start = True
- continue
- if start:
- if line.strip() == '```':
- # 去掉最后的\n
- cmd = cmd[:-1]
- break
- cmd += line+'\n'
-
- return result, cmd
- def issueModel(question):
- global lock, LOCAL_PORT, chats
- # 显示转圈
- lock = [True, '']
- try:
- _thread.start_new_thread(loading, (lock,))
- except Exception as e:
- print(e)
- cli = ""
- if USE_LOCAL:
- # 检查11434端口是否可用
- if check_port_available(LOCAL_PORT):
- # 本地ollama
- lock[1] ="\033[1;32m终端助理正在本地思考...\033[0m"
- answer = askLocal(question)
- chats.append([question, answer])
- cli = parse(answer, True)
- if cli== "":
- lock[1] ="\033[1;32m终端助理正在云端思考...\033[0m"
- # 百度千帆
- if BACKEND=="ernie":
- query = askERNIE
- elif BACKEND=="openai":
- query = askOpenAI
- # OpenAI API
- answer = query(question)
-
- result, cli = parse(answer, False)
- chats.append([question, result])
- lock[0] = False
- return cli
- def loading(lock):
- chars = ['⣾', '⣷', '⣯', '⣟', '⡿', '⢿', '⣻', '⣽']
- i = 0
- print('')
- while lock[0]:
- i = (i+1) % len(chars)
- print('\033[A%s %s' %
- (chars[i], lock[1] or '' if len(lock) >= 2 else ''))
- time.sleep(0.1)
- # 执行alias命令,获取输出结果保存为alias_result
- def alias():
- # 定义一个dict
- alias_result = {'egrep': 'egrep --color=auto'
- ,'grep': 'grep --color=auto'
- ,'fgrep': 'fgrep --color=auto'
- ,'grep': 'grep --color=auto'
- ,'l': 'ls -CF'
- ,'ll': 'ls -alF'
- ,'la': 'ls -A'
- ,'ls': 'ls --color=auto'}
- return alias_result
- def replaceAlias(cmd):
- # 获取alias_result
- alias_result = alias()
- # 获取cmd中的第一个单词
- cmd_first = cmd.split(' ')[0]
- # 如果cmd_first在alias_result中,则替换
- if cmd_first in alias_result:
- cmd = alias_result[cmd_first] + ' ' + cmd.replace(cmd_first, '')
- return cmd
- if __name__ == '__main__':
- global lock
- # 如果没有参数
- if len(sys.argv) < 2:
- print("Copyright (c) 2024 Xiongwei Yu. Info-Terminal Copilot v1.0 \n\n \
- Usage: \n \
- erniecli command question : \"?? question\" or \"? question\" for short, quest a command and run\n \
- erniecli config : set your config\n \
- erniecli alias : show alias\n \
- erniecli version : show version")
- sys.exit(0)
- # 获取第一个参数
- cmd = sys.argv[1]
- if cmd == "config":
- set_config()
- # 设置??别名
- if cmd == "alias":
- print ("alias erniecli='erniecli.py'")
- print ("alias ??='erniecli.py command'")
- print ("alias ??='erniecli.py command'")
- print ("alias ?='erniecli.py command'")
- print ("alias ?='erniecli.py command'")
- # 显示版本信息
- if cmd == "version":
- # 紫色显示
- print("\033[1;95m终端助理 Version 0.1\n\033[0m")
- # 用绿色字体显示“基于文心一言的对话式命令行助理”
- print("\033[1;32m基于大语言模型的对话式终端助理\n可使用百度千帆文心大模型ERNIE-3.5-8K或其他OpenAI接口的大语言模型\n让命令行插上AI的翅膀🪽\033[0m")
- sys.exit(0)
- # 如果cmd为command,调用ask函数
- if cmd == "command":
- get_config()
- # 获取第二个参数
- # 如果第二个参数为空,则输出错误,用红色字体显示
- if len(sys.argv) < 3:
- print("\033[1;31m请输入你的意图\033[0m")
- sys.exit(0)
- # 获取后面的所有参数,并拼接成字符串
- question = ' '.join(sys.argv[2:])
- # question = sys.argv[2]
- # 如果question为空,则输出错误,用红色字体显示
- if question == "":
- print("\033[1;31m请输入你的意图\033[0m")
- sys.exit(0)
- # 调用ask函数,并输出结果
-
- #使用绿色字体
- cli = ""
- while True:
- cli = issueModel(question)
- if cli == "":
- question = input("\033[A\033[2K\033[1;33mAI没有找到可执行的命令[\x1B[4m\033[37mA\x1B[0m\033[1;33mbort/您的需求] \033[0m")
- if question == "A" or question == "a" or question == "":
- print('已取消')
- sys.exit(0)
- else:
- continue
- print('\033[F\033[K',end = "\033[1;32m❯ \033[0m")
- print('\033[1;32m{0}\033[0m'.format(cli))
-
- question = input('\033[1;32m? \033[0m\033[1;90m是否执行 ⬆ ? [\x1B[4m\033[37mC\x1B[0m\033[1;90monfirm/\x1B[4m\033[37ma\x1B[0m\033[1;90mbort/您的需求] \033[0m')
- if question == 'C' or question == 'c' or question == '':
- sys.stdout.write('\033[A\r')
- sys.stdout.flush()
- sys.stdout.write('\033[K')
- sys.stdout.flush()
- # 执行命令,并输出结果
- # print('')
- cmd = replaceAlias(cli)
- subprocess.run(cli, shell=True)
- sys.exit(0)
- elif question == 'A' or question == 'a':
- print('已取消')
- sys.exit(0)
- else:
- continue
|