go实现支持大文件分块传输及断点续传的双向增量文件同步加密传输系统代码
代码语言:golang
所属分类:其他
代码描述:go实现支持大文件分块传输及断点续传的双向增量文件同步加密传输系统代码,客户端与服务端文件夹内文件实时保持同步,加密通讯传输。
代码标签: go 实时 大文件 分块 传输 断点 续传 双向 增量 文件 同步 加密 传输 系统 代码
下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开
// gosync_chunk.go
// 分块传输 + 断点续传 + TLS 加密 + 实时双向增量同步
//
// 用法:
// 监听端:go run gosync_chunk.go -listen :9999 -dir /path/A -pass "your-pass"
// 连接端:go run gosync_chunk.go -connect 1.2.3.4:9999 -dir /path/B -pass "your-pass"
//
// 选项:
// -tmpdir 临时目录(默认 <dir>/.gosync_tmp;不会被同步)
// -chunk 分块大小(字节,默认 524288 即 512KB)
// -skew mtime 允许偏差,默认 1s
// -v 详细日志
//
// 协议要点:
// - offer: 发送端提出要传哪个文件(含总大小、mtime、mode)
// - wantrange: 接收端响应需要从哪个偏移开始(支持断点),或 reject
// - chunk: 分块数据(off/size + 数据体),可与其他控制消息交错
// - putdone: 发送端宣告发送完毕,接收端将 .part 原子替换为目标文件
//
// 注意:
// - 临时文件保存在 tmpdir(默认 <dir>/.gosync_tmp),并被彻底排除扫描/监听/同步
// - 若 -tmpdir 指向不同分区,最终替换会回退为“复制+替换”(非严格原子,但不会在同步目录产生临时文件)
// - 初次对齐不做“缺失文件删除”,运行中删除会实时同步
// - 冲突按 mtime 新者为准(可用 -skew 微调)
package main
import (
"bufio"
"crypto/ed25519"
"crypto/sha256"
"crypto/tls"
"crypto/x509"
"crypto/x509/pkix"
"encoding/binary"
"encoding/hex"
"encoding/json"
"flag"
"fmt"
"io"
"io/fs"
"log"
"math/big"
"net"
"os"
"path/filepath"
"strings"
"sync"
"time"
"github.com/fsnotify/fsnotify"
"golang.org/x/crypto/scrypt"
)
// ==== 协议消息 ====
type Msg struct {
Type string `json:"type"` // "stat","statdone","pull","offer","wantrange","chunk","putdone","mkdir","del","ack","ping"
Path string `json:"path,omitempty"` // 相对路径(/ 分隔)
Size int64 `json:"size,omitempty"` // stat/offer: 总大小;chunk: 分块大小
Mode uint32 `json:"mode,omitempty"` // 文件/目录权限
MTime int64 `json:"mtime,omitempty"` // 修改时间(纳秒)
Dir bool `json:"dir,omitempty"` // 目录标志
Off int64 `json:"off,omitempty"` // chunk 偏移 / wantrange 的起始偏移
Reject bool `json:"reject,omitempty"` // wantrange 是否拒绝本次传输
Err string `json:"err,omitempty"` // 错误消息(ack)
Reason string `json:"reason,omitempty"` // 附加说明
}
type safeWriter struct {
mu sync.Mutex
bw *bufio.Writer
}
func (sw *safeWriter) SendHeader(msg *Msg) error {
b, err := json.Marshal(msg)
if err != nil {
return err
}
sw.mu.Lock()
defer sw.mu.Unlock()
var lenBuf [4]byte
binary.BigEndian.PutUint32(lenBuf[:], uint32(len(b)))
if _, err := sw.bw.Write(lenBuf[:]); err != nil {
return err
}
if _, err := sw.bw.Write(b); err != nil {
return err
}
return sw.bw.Flush()
}
func (sw *safeWriter) SendWithData(msg *Msg, r io.Reader) error {
b, err := json.Marshal(msg)
if err != nil {
return err
}
sw.mu.Lock()
defer sw.mu.Unlock()
var lenBuf [4]byte
binary.BigEndian.PutUint32(lenBuf[:], uint32(len(b)))
if _, err := sw.bw.Write(lenBuf[:]); err != nil {
return err
}
if _, err := sw.bw.Write(b); err != nil {
return err
}
if msg.Size > 0 && r != nil {
if _, err := io.CopyN(sw.bw, r, msg.Size); err != nil {
return err
}
}
return sw.bw.Flush()
}
func readHeader(br *bufio.Reader) (*Msg, error) {
var lenBuf [4]byte
if _, err := io.ReadFull(br, lenBuf[:]); err != nil {
return nil, err
}
n := binary.BigEndian.Uint32(lenBuf[:])
if n == 0 || n > 64*1024*1024 {
return nil, fmt.Errorf("invalid header length: %d", n)
}
b := make([]byte, n)
if _, err := io.ReadFull(br, b); err != nil {
return nil, err
}
var m Msg
if err := json.Unmarshal(b, &m); err != nil {
return nil, err
}
return &m, nil
}
// ==== 同步器 ====
type Syncer struct {
root string
rootAbs string
sw *safeWriter
br *bufio.Reader
watcher *fsnotify.Watcher
watching map[string]struct{}
wmu sync.Mutex
ignore map[string]time.Time
imu sync.Mutex
debounce map[string]*time.Timer
dmu sync.Mutex
skew time.Duration
verbose bool
chunkSize int64
closing chan struct{}
closedOnce sync.Once
// 临时目录
tmpDir string // 用户给的或默认
tmpDirAbs string
tmpDirNorm string
tmpRel string
// offer 等待表(用于等待对端的 wantrange)
offerMu sync.Mutex
offerWaiters map[string]chan *Msg
}
func normPath(p string) string {
ap, _ := filepath.Abs(p)
ap = filepath.Clean(ap)
ap = strings.ReplaceAll(ap, "\\", "/")
ap = strings.ToLower(ap)
return ap
}
func NewSyncer(root string, br *bufio.Reader, sw *safeWriter, skew time.Duration, verbose bool, tmpDirOpt string, chunkSize int64) (*Syncer, error) {
w, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
}
rootAbs, err := filepath.Abs(root)
if err != nil {
return nil, err
}
s := &Syncer{
root: root,
rootAbs: rootAbs,
br: br,
sw: sw,
watcher: w,
watching: make(map[string]struct{}),
ignore: make(map[string]time.Time),
debounce: make(map[string]*time.Timer),
skew: skew,
verbose: verbose,
chunkSize: chunkSize,
closing: make(chan struct{}),
offerWaiters: make(map[string]chan *Msg),
}
// 选择临时目录
if tmpDirOpt != "" {
s.tmpDir = tmpDirOpt
} else {
s.tmpDir = filepa.........完整代码请登录后点击上方下载按钮下载查看















网友评论0