分布式任务系统 cobweb

1. 源码下载

请访问 GitHub 查看版本信息,下载最新源码

2. 功能特性

a) 基础功能

  • cobweb的基本功能,是在一台Server上分发任务,在其它多台服务器上部署Client程序,并执行相应的任务。
  • 任务函数代码编辑后在Server上发布,会自动分发到各台Client服务器上,并重载函数模块,使用新的执行代码。
  • 任务函数可选择是无参数运行,也可选择从Server端获取任务列表。
  • 任务函数可选择是运行后无返回,也可选择将运行结果提交到Server端

b) 特性

  • Client程序部署简单,只需要部署并配置后运行即可。
  • 一个Client可配置多个Server,来自不同Server的任务开启各自的进程执行。
  • 来自同一个Server,但不同的任务模块,有各自独立的线程并发运行,支持多CPU。
  • 任务代码分发、重载自动化,只需要维护Server服务器上的代码。
  • Server和Client之间的通信进行文本加密。

c) 使用场景

  • 分布式爬虫系统
  • 直播地址监测
  • 服务器监控

d) 语言环境

  • 基于Python 2.7
  • 服务端依赖web.py
  • 任务队列部分依赖Redis和pyredis

3. 开发说明

a) 源码结构

server

  • 服务端代码,模块包是Cobweb。
  • 自定义的任务函数文件保持在Modules目录下,必须使用xxxx.py的文件名。
  • 自定义的函数文件名以”lib_”开头时,可进行同步,供其它任务函数文件调用,但不会开启独立的任务线程。
  • Modules子目录的内容也会进行同步,供其它任务函数文件调用,但不会开启独立的任务线程。

client

  • 客户端代码,模块包是Cobweb。Modules目录保持为空即可,启动后client会自动同步server服务器上的Modules目录到本地

b) Server端开发

首先import相关模块

import json
import web
import Cobweb.Fname
import Cobweb.Task

初始化Fname模块,定义加密串,设计好web.py的路由。

# dict_string for encryption
dictString = "ThankYouForChoosingMoreTV"
 
Fname = Cobweb.Fname.Fname(dictString)
Task = Cobweb.Task.Task()
 
urls = (
    '/cobweb/index', 'index', # Provide modules' list
    '/cobweb/download/(.+)', 'download', # Provide module files
    '/cobweb/parameter', 'parameter', # Provide task para
    '/cobweb/result', 'result', # Get task's result 
    '/cobweb/reindex', 'reindex',  # Rebuild the index (Manual)
)

获取模块文件列表接口

class index:
    def GET(self):
        return Fname.index()  # Provide modules' list

下载模块文件接口

class download:
    def GET(self, fname):
        return Fname.files(fname) # Provide module files

手动刷新模块文件列表接口(可选,另一种做法是每次更新模块文件,重启Server端服务)

class reindex:
    def GET(self):
        return Fname.newindex() # Rebuild the index

启动Server端的web服务

if __name__ == "__main__":
    app = web.application(urls, globals())
    app.run()

同时可选提供任务分发接口,以及任务结果上传接口。 任务分发接口:

class parameter:
    def POST(self):
        data = web.input()
        Host = data["Host"] if "Host" in data else "unknow"
        Module = data["Module"] if "Module" in data else False
        ### not ready ###
        task = Task.request(Module, Host)
        #################
        para = Fname.encryption(json.dumps(task))
        return para

任务结果上传接口

class result:
    def POST(self):
        data = web.input()
        Host = data["Host"] if "Host" in data else "unknow"
        Module = data["Module"] if "Module" in data else False
        Result = data["Result"] if "Result" in data else False
        ### not ready ###
        status = Task.response(Module, Host, Result)
        #################
        return status

c) Client端开发

配置公共参数,包括client端的进程pid、日志log文件名,以及终端标识、日志级别(0-5)。当日志级别大于1时,作为后台进程模式运行。

pidFile = "cobweb.pid"
hostName = "TestHost"
logFile = "cobweb.log"
logLevel = 1 # if logLevel > 0 then run as a daemon

初始化参数,注意加密字符串必须保证和Server端相同。可配置多组参数,对应不同Server

taskList = [
    {
        "dictString" : "ThankYouForChoosingMoreTV",
        "indexUrl" : "http://127.0.0.1:8080/cobweb/index",
        "downloadUrl" : "http://127.0.0.1:8080/cobweb/download",
        "taskFreq" : 5, # Option
        "indexFreq" : 15, # Option        
        "taskUrl" : "http://127.0.0.1:8080/cobweb/list",  # Option
        "resultUrl" : "http://127.0.0.1:8080/cobweb/result", # Option
    }
]

加载模块,初始化并以进程模式,启动任务检查与执行。

import Cobweb.Process
import Cobweb.Thread
import Cobweb.Utiles
 
Logger = Cobweb.Utiles.Logger(logFile, logLevel)
Daemon = Cobweb.Process.Daemon(pidFile, Logger)
 
try:
    if 0 == logLevel:
        import time
        for task in taskList:
            Cobweb.Thread.Tasks(task, hostName, Logger).start()
        while True: time.sleep(9999)
    else:
        for task in taskList:
            Daemon.start(Cobweb.Thread.Tasks(task, hostName, Logger))
except (IOError,EOFError,KeyboardInterrupt):
    Daemon.killAll()
    exit(0)

d) 自定义任务模块开发

自定义任务模块必须定义一个task函数,作为入口函数。

def sqrt(n):
	return n**n
 
def task(n=False):
    if n:
        r = sqrt(n)
    else:
        r = 0
    return r