go实现支持大文件分块传输及断点续传的双向增量文件同步加密传输系统代码

代码语言:golang

所属分类:其他

代码描述:go实现支持大文件分块传输及断点续传的双向增量文件同步加密传输系统代码,客户端与服务端文件夹内文件实时保持同步,加密通讯传输。

代码标签: go 实时 大文件 分块 传输 断点 续传 双向 增量 文件 同步 加密 传输 系统 代码

下面为部分代码预览,完整代码请点击下载或在bfwstudio webide中打开

// gosync_chunk.go
// 分块传输 + 断点续传 + TLS 加密 + 实时双向增量同步
//
// 用法:
// 监听端:go run gosync_chunk.go -listen :9999 -dir /path/A -pass "your-pass"
// 连接端:go run gosync_chunk.go -connect 1.2.3.4:9999 -dir /path/B -pass "your-pass"
//
// 选项:
// -tmpdir   临时目录(默认 <dir>/.gosync_tmp;不会被同步)
// -chunk    分块大小(字节,默认 524288 即 512KB)
// -skew     mtime 允许偏差,默认 1s
// -v        详细日志
//
// 协议要点:
// - offer:     发送端提出要传哪个文件(含总大小、mtime、mode)
// - wantrange: 接收端响应需要从哪个偏移开始(支持断点),或 reject
// - chunk:     分块数据(off/size + 数据体),可与其他控制消息交错
// - putdone:   发送端宣告发送完毕,接收端将 .part 原子替换为目标文件
//
// 注意:
// - 临时文件保存在 tmpdir(默认 <dir>/.gosync_tmp),并被彻底排除扫描/监听/同步
// - 若 -tmpdir 指向不同分区,最终替换会回退为“复制+替换”(非严格原子,但不会在同步目录产生临时文件)
// - 初次对齐不做“缺失文件删除”,运行中删除会实时同步
// - 冲突按 mtime 新者为准(可用 -skew 微调)

package main

import (
	"bufio"
	"crypto/ed25519"
	"crypto/sha256"
	"crypto/tls"
	"crypto/x509"
	"crypto/x509/pkix"
	"encoding/binary"
	"encoding/hex"
	"encoding/json"
	"flag"
	"fmt"
	"io"
	"io/fs"
	"log"
	"math/big"
	"net"
	"os"
	"path/filepath"
	"strings"
	"sync"
	"time"

	"github.com/fsnotify/fsnotify"
	"golang.org/x/crypto/scrypt"
)

// ==== 协议消息 ====

type Msg struct {
	Type   string `json:"type"`             // "stat","statdone","pull","offer","wantrange","chunk","putdone","mkdir","del","ack","ping"
	Path   string `json:"path,omitempty"`   // 相对路径(/ 分隔)
	Size   int64  `json:"size,omitempty"`   // stat/offer: 总大小;chunk: 分块大小
	Mode   uint32 `json:"mode,omitempty"`   // 文件/目录权限
	MTime  int64  `json:"mtime,omitempty"`  // 修改时间(纳秒)
	Dir    bool   `json:"dir,omitempty"`    // 目录标志
	Off    int64  `json:"off,omitempty"`    // chunk 偏移 / wantrange 的起始偏移
	Reject bool   `json:"reject,omitempty"` // wantrange 是否拒绝本次传输
	Err    string `json:"err,omitempty"`    // 错误消息(ack)
	Reason string `json:"reason,omitempty"` // 附加说明
}

type safeWriter struct {
	mu sync.Mutex
	bw *bufio.Writer
}

func (sw *safeWriter) SendHeader(msg *Msg) error {
	b, err := json.Marshal(msg)
	if err != nil {
		return err
	}
	sw.mu.Lock()
	defer sw.mu.Unlock()
	var lenBuf [4]byte
	binary.BigEndian.PutUint32(lenBuf[:], uint32(len(b)))
	if _, err := sw.bw.Write(lenBuf[:]); err != nil {
		return err
	}
	if _, err := sw.bw.Write(b); err != nil {
		return err
	}
	return sw.bw.Flush()
}

func (sw *safeWriter) SendWithData(msg *Msg, r io.Reader) error {
	b, err := json.Marshal(msg)
	if err != nil {
		return err
	}
	sw.mu.Lock()
	defer sw.mu.Unlock()
	var lenBuf [4]byte
	binary.BigEndian.PutUint32(lenBuf[:], uint32(len(b)))
	if _, err := sw.bw.Write(lenBuf[:]); err != nil {
		return err
	}
	if _, err := sw.bw.Write(b); err != nil {
		return err
	}
	if msg.Size > 0 && r != nil {
		if _, err := io.CopyN(sw.bw, r, msg.Size); err != nil {
			return err
		}
	}
	return sw.bw.Flush()
}

func readHeader(br *bufio.Reader) (*Msg, error) {
	var lenBuf [4]byte
	if _, err := io.ReadFull(br, lenBuf[:]); err != nil {
		return nil, err
	}
	n := binary.BigEndian.Uint32(lenBuf[:])
	if n == 0 || n > 64*1024*1024 {
		return nil, fmt.Errorf("invalid header length: %d", n)
	}
	b := make([]byte, n)
	if _, err := io.ReadFull(br, b); err != nil {
		return nil, err
	}
	var m Msg
	if err := json.Unmarshal(b, &m); err != nil {
		return nil, err
	}
	return &m, nil
}

// ==== 同步器 ====

type Syncer struct {
	root       string
	rootAbs    string
	sw         *safeWriter
	br         *bufio.Reader
	watcher    *fsnotify.Watcher
	watching   map[string]struct{}
	wmu        sync.Mutex
	ignore     map[string]time.Time
	imu        sync.Mutex
	debounce   map[string]*time.Timer
	dmu        sync.Mutex
	skew       time.Duration
	verbose    bool
	chunkSize  int64
	closing    chan struct{}
	closedOnce sync.Once

	// 临时目录
	tmpDir     string // 用户给的或默认
	tmpDirAbs  string
	tmpDirNorm string
	tmpRel     string

	// offer 等待表(用于等待对端的 wantrange)
	offerMu      sync.Mutex
	offerWaiters map[string]chan *Msg
}

func normPath(p string) string {
	ap, _ := filepath.Abs(p)
	ap = filepath.Clean(ap)
	ap = strings.ReplaceAll(ap, "\\", "/")
	ap = strings.ToLower(ap)
	return ap
}

func NewSyncer(root string, br *bufio.Reader, sw *safeWriter, skew time.Duration, verbose bool, tmpDirOpt string, chunkSize int64) (*Syncer, error) {
	w, err := fsnotify.NewWatcher()
	if err != nil {
		return nil, err
	}
	rootAbs, err := filepath.Abs(root)
	if err != nil {
		return nil, err
	}
	s := &Syncer{
		root:         root,
		rootAbs:      rootAbs,
		br:           br,
		sw:           sw,
		watcher:      w,
		watching:     make(map[string]struct{}),
		ignore:       make(map[string]time.Time),
		debounce:     make(map[string]*time.Timer),
		skew:         skew,
		verbose:      verbose,
		chunkSize:    chunkSize,
		closing:      make(chan struct{}),
		offerWaiters: make(map[string]chan *Msg),
	}
	// 选择临时目录
	if tmpDirOpt != "" {
		s.tmpDir = tmpDirOpt
	} else {
		s.tmpDir = filepa.........完整代码请登录后点击上方下载按钮下载查看

网友评论0