目录

分布式任务系统 cobweb

1. 源码下载

请访问 GitHub 查看版本信息,下载最新源码

2. 功能特性

a) 基础功能

b) 特性

c) 使用场景

d) 语言环境

3. 开发说明

a) 源码结构

server

client

b) Server端开发

首先import相关模块

import json
import web
import Cobweb.Fname
import Cobweb.Task

初始化Fname模块,定义加密串,设计好web.py的路由。

# dict_string for encryption
dictString = "ThankYouForChoosingMoreTV"
 
Fname = Cobweb.Fname.Fname(dictString)
Task = Cobweb.Task.Task()
 
urls = (
    '/cobweb/index', 'index', # Provide modules' list
    '/cobweb/download/(.+)', 'download', # Provide module files
    '/cobweb/parameter', 'parameter', # Provide task para
    '/cobweb/result', 'result', # Get task's result 
    '/cobweb/reindex', 'reindex',  # Rebuild the index (Manual)
)

获取模块文件列表接口

class index:
    def GET(self):
        return Fname.index()  # Provide modules' list

下载模块文件接口

class download:
    def GET(self, fname):
        return Fname.files(fname) # Provide module files

手动刷新模块文件列表接口(可选,另一种做法是每次更新模块文件,重启Server端服务)

class reindex:
    def GET(self):
        return Fname.newindex() # Rebuild the index

启动Server端的web服务

if __name__ == "__main__":
    app = web.application(urls, globals())
    app.run()

同时可选提供任务分发接口,以及任务结果上传接口。 任务分发接口:

class parameter:
    def POST(self):
        data = web.input()
        Host = data["Host"] if "Host" in data else "unknow"
        Module = data["Module"] if "Module" in data else False
        ### not ready ###
        task = Task.request(Module, Host)
        #################
        para = Fname.encryption(json.dumps(task))
        return para

任务结果上传接口

class result:
    def POST(self):
        data = web.input()
        Host = data["Host"] if "Host" in data else "unknow"
        Module = data["Module"] if "Module" in data else False
        Result = data["Result"] if "Result" in data else False
        ### not ready ###
        status = Task.response(Module, Host, Result)
        #################
        return status

c) Client端开发

配置公共参数,包括client端的进程pid、日志log文件名,以及终端标识、日志级别(0-5)。当日志级别大于1时,作为后台进程模式运行。

pidFile = "cobweb.pid"
hostName = "TestHost"
logFile = "cobweb.log"
logLevel = 1 # if logLevel > 0 then run as a daemon

初始化参数,注意加密字符串必须保证和Server端相同。可配置多组参数,对应不同Server

taskList = [
    {
        "dictString" : "ThankYouForChoosingMoreTV",
        "indexUrl" : "http://127.0.0.1:8080/cobweb/index",
        "downloadUrl" : "http://127.0.0.1:8080/cobweb/download",
        "taskFreq" : 5, # Option
        "indexFreq" : 15, # Option        
        "taskUrl" : "http://127.0.0.1:8080/cobweb/list",  # Option
        "resultUrl" : "http://127.0.0.1:8080/cobweb/result", # Option
    }
]

加载模块,初始化并以进程模式,启动任务检查与执行。

import Cobweb.Process
import Cobweb.Thread
import Cobweb.Utiles
 
Logger = Cobweb.Utiles.Logger(logFile, logLevel)
Daemon = Cobweb.Process.Daemon(pidFile, Logger)
 
try:
    if 0 == logLevel:
        import time
        for task in taskList:
            Cobweb.Thread.Tasks(task, hostName, Logger).start()
        while True: time.sleep(9999)
    else:
        for task in taskList:
            Daemon.start(Cobweb.Thread.Tasks(task, hostName, Logger))
except (IOError,EOFError,KeyboardInterrupt):
    Daemon.killAll()
    exit(0)

d) 自定义任务模块开发

自定义任务模块必须定义一个task函数,作为入口函数。

def sqrt(n):
	return n**n
 
def task(n=False):
    if n:
        r = sqrt(n)
    else:
        r = 0
    return r