目录
分布式任务系统 cobweb
1. 源码下载
2. 功能特性
a) 基础功能
- cobweb的基本功能,是在一台Server上分发任务,在其它多台服务器上部署Client程序,并执行相应的任务。
- 任务函数代码编辑后在Server上发布,会自动分发到各台Client服务器上,并重载函数模块,使用新的执行代码。
- 任务函数可选择是无参数运行,也可选择从Server端获取任务列表。
- 任务函数可选择是运行后无返回,也可选择将运行结果提交到Server端
b) 特性
- Client程序部署简单,只需要部署并配置后运行即可。
- 一个Client可配置多个Server,来自不同Server的任务开启各自的进程执行。
- 来自同一个Server,但不同的任务模块,有各自独立的线程并发运行,支持多CPU。
- 任务代码分发、重载自动化,只需要维护Server服务器上的代码。
- Server和Client之间的通信进行文本加密。
c) 使用场景
- 分布式爬虫系统
- 直播地址监测
- 服务器监控
d) 语言环境
- 基于Python 2.7
- 服务端依赖web.py
任务队列部分依赖Redis和pyredis
3. 开发说明
a) 源码结构
server
- 服务端代码,模块包是Cobweb。
- 自定义的任务函数文件保持在Modules目录下,必须使用xxxx.py的文件名。
- 自定义的函数文件名以”lib_”开头时,可进行同步,供其它任务函数文件调用,但不会开启独立的任务线程。
- Modules子目录的内容也会进行同步,供其它任务函数文件调用,但不会开启独立的任务线程。
client
- 客户端代码,模块包是Cobweb。Modules目录保持为空即可,启动后client会自动同步server服务器上的Modules目录到本地
b) Server端开发
首先import相关模块
import json import web import Cobweb.Fname import Cobweb.Task
初始化Fname模块,定义加密串,设计好web.py的路由。
# dict_string for encryption dictString = "ThankYouForChoosingMoreTV" Fname = Cobweb.Fname.Fname(dictString) Task = Cobweb.Task.Task() urls = ( '/cobweb/index', 'index', # Provide modules' list '/cobweb/download/(.+)', 'download', # Provide module files '/cobweb/parameter', 'parameter', # Provide task para '/cobweb/result', 'result', # Get task's result '/cobweb/reindex', 'reindex', # Rebuild the index (Manual) )
获取模块文件列表接口
class index: def GET(self): return Fname.index() # Provide modules' list
下载模块文件接口
class download: def GET(self, fname): return Fname.files(fname) # Provide module files
手动刷新模块文件列表接口(可选,另一种做法是每次更新模块文件,重启Server端服务)
class reindex: def GET(self): return Fname.newindex() # Rebuild the index
启动Server端的web服务
if __name__ == "__main__": app = web.application(urls, globals()) app.run()
同时可选提供任务分发接口,以及任务结果上传接口。 任务分发接口:
class parameter: def POST(self): data = web.input() Host = data["Host"] if "Host" in data else "unknow" Module = data["Module"] if "Module" in data else False ### not ready ### task = Task.request(Module, Host) ################# para = Fname.encryption(json.dumps(task)) return para
任务结果上传接口
class result: def POST(self): data = web.input() Host = data["Host"] if "Host" in data else "unknow" Module = data["Module"] if "Module" in data else False Result = data["Result"] if "Result" in data else False ### not ready ### status = Task.response(Module, Host, Result) ################# return status
c) Client端开发
配置公共参数,包括client端的进程pid、日志log文件名,以及终端标识、日志级别(0-5)。当日志级别大于1时,作为后台进程模式运行。
pidFile = "cobweb.pid" hostName = "TestHost" logFile = "cobweb.log" logLevel = 1 # if logLevel > 0 then run as a daemon
初始化参数,注意加密字符串必须保证和Server端相同。可配置多组参数,对应不同Server
taskList = [ { "dictString" : "ThankYouForChoosingMoreTV", "indexUrl" : "http://127.0.0.1:8080/cobweb/index", "downloadUrl" : "http://127.0.0.1:8080/cobweb/download", "taskFreq" : 5, # Option "indexFreq" : 15, # Option "taskUrl" : "http://127.0.0.1:8080/cobweb/list", # Option "resultUrl" : "http://127.0.0.1:8080/cobweb/result", # Option } ]
加载模块,初始化并以进程模式,启动任务检查与执行。
import Cobweb.Process import Cobweb.Thread import Cobweb.Utiles Logger = Cobweb.Utiles.Logger(logFile, logLevel) Daemon = Cobweb.Process.Daemon(pidFile, Logger) try: if 0 == logLevel: import time for task in taskList: Cobweb.Thread.Tasks(task, hostName, Logger).start() while True: time.sleep(9999) else: for task in taskList: Daemon.start(Cobweb.Thread.Tasks(task, hostName, Logger)) except (IOError,EOFError,KeyboardInterrupt): Daemon.killAll() exit(0)
d) 自定义任务模块开发
自定义任务模块必须定义一个task函数,作为入口函数。
def sqrt(n): return n**n def task(n=False): if n: r = sqrt(n) else: r = 0 return r