golang调用hbase自定义coprocessor客户端实现

最近在使用golang改造,java操作hbase项目时,用到开源项目github.com/tsuna/gohbase,但是该项目没有调用自定义coprocessor的功能,只能自己扩展源码实现这个功能.这里贴出扩展代码,希望可以帮助到有同样需求的同学.

# 实现过程

从代码实现的角度来看可以把coprocessor理解为加强版的scan - 整体来说在做扩展的时候完全可以沿用scan的实现方式 - 通过.proto 文件来看与和scan不同的是,scan的request,response都是已经定义好的,而coprocessor的request,response都是一个[]byte数组 通过以上2点来确定总的实现思路,具体实现还需要深入代码去实现,不仅要理解github.com/tsuna/gohbase的实现:如何包装proto数据包,scan如何实现所有regionserver的数据遍历,也要理解java版的hbase-client,coprocessor数据包methodName是什么,[]byte数据如何组装,如何解码

# 实现源码

I have extended the source code so that it is easy to request and get a custom corprocessor response. I can stick up the code may help people with the same needs

hrpc/corprocessor.go

package hrpc

import (
    "github.com/tsuna/gohbase/pb"
    "github.com/golang/protobuf/proto"
    "context"
    "github.com/tsuna/gohbase/filter"
)

type Coprocessor struct {
    base
    corRquest ServiceRequest
}

func baseCoprocessor(ctx context.Context, table []byte,request ServiceRequest,
    options ...func(Call) error) (*Coprocessor, error) {
    s := &Coprocessor{
        base: base{
            table: table,
            ctx:   ctx,
        },
        corRquest:request,
    }
    err := applyOptions(s, options...)
    if err != nil {
        return nil, err
    }

    return s, nil
}

func NewCoprocessor(ctx context.Context, table string,request ServiceRequest, options ...func(Call) error)(*Coprocessor, error){
    return baseCoprocessor(ctx,[]byte(table),request,options... )
}

func NewRangeCoprocessor(ctx context.Context, table string,startKey []byte ,request ServiceRequest, options ...func(Call) error)(*Coprocessor, error){
    cor,err  := baseCoprocessor(ctx,[]byte(table),request,options... )
    if err  != nil{
        return nil,err
    }
    cor.key =startKey

    return cor,nil
}


type CoprocessorIte interface {

    Next() (ServiceResponse, error)

    Close() error
}

type ServiceRequest interface{
    GetServiceName() string
    GetMethodName() string
    GetRow() []byte
    GetProtoRequest() ([]byte,error)
    GetStartRow() string
    GetEndRow() string
    SetRow([]byte)

}
type ServiceResponse interface{
    GetResponse(responseByte []byte) ServiceResponse

}





// Name returns the name of this RPC call.
func (s *Coprocessor) Name() string {
    return "ExecService"
}

// ToProto converts this Scan into a protobuf message
func (s *Coprocessor) ToProto() (proto.Message, error) {
    coprocessor := &pb.CoprocessorServiceRequest{
        Region:       s.regionSpecifier(),
        Call: &pb.CoprocessorServiceCall{},
    }
    println("ToProto coprocessor.Call.Row==,"+string(s.corRquest.GetRow()))
    coprocessor.Call.Row = s.corRquest.GetRow()
    serviceName := s.corRquest.GetServiceName()
    coprocessor.Call.ServiceName = &serviceName
    methodName := s.corRquest.GetMethodName()
    coprocessor.Call.MethodName = &methodName


    if  requstBytes,err :=  s.corRquest.GetProtoRequest(); err == nil{
        coprocessor.Call.Request = requstBytes
    }else{
        return nil, err
    }
    return coprocessor, nil
}

// NewResponse creates an empty protobuf message to read the response
// of this RPC.
func (s *Coprocessor) NewResponse() proto.Message {
    return &pb.CoprocessorServiceResponse{}
}

// SetFamilies sets the families covered by this scanner.
func (s *Coprocessor) SetFamilies(fam map[string][]string) error {
    return nil
}

// SetFilter sets the request's filter.
func (s *Coprocessor) SetFilter(ft filter.Filter) error {
    return nil
}


// SetFilter sets the request's filter.
func (s *Coprocessor) GetCorRequest() ServiceRequest {
    return s.corRquest
}


// SetFilter sets the request's filter.
func (s *Coprocessor) SetCorRequest(request ServiceRequest) error {
    s.corRquest = request
    return nil
}

func (s *Coprocessor) StopRow() []byte {
    return []byte(s.corRquest.GetEndRow())
}

// StartRow returns the start key (inclusive) of this scanner.
func (s *Coprocessor) StartRow() []byte {
    return []byte(s.corRquest.GetStartRow())
}

corprocessor_ite.go

package gohbase

import (
    "github.com/tsuna/gohbase/hrpc"
    "sync"
    "context"
    "io"
    "github.com/tsuna/gohbase/pb"
    "errors"
    "bytes"
)

type coprocessorRe struct{
    r *pb.NameBytesPair
    e error
}


type coprocessorIte struct {
    // fetcher's fileds shouldn't be accessed by scanner
    // TODO: maybe separate fetcher into a different package
    f       coprocessorFetcher
    once    sync.Once
    resultsCh chan coprocessorRe
    resultsM sync.Mutex
    results   *pb.NameBytesPair
    cancel  context.CancelFunc
}

func (s *coprocessorIte) peek() (*pb.NameBytesPair, error) {
    if s.f.ctx.Err() != nil {
        return nil, io.EOF
    }
    if s.results == nil   {
        re, ok := <-s.resultsCh
        if !ok {
            return nil, io.EOF
        }
        if re.e != nil {
            return nil, re.e
        }
        // fetcher never returns empty results
        s.results = re.r
    }
    return s.results, nil
}

func (s *coprocessorIte) shift() {
    if len(s.results.Value) == 0 {
        return
    }
    // set to nil so that GC isn't blocked to clean up the result
    s.results = nil
}

// coalesce combines result with partial if they belong to the same row
// and returns the coalesed result and whether coalescing happened
func (s *coprocessorIte) coalesce(result, partial *pb.NameBytesPair) (*pb.NameBytesPair, bool) {


    return partial, true
}


func newCoprocessorIte(c RPCClient, rpc *hrpc.Coprocessor,result hrpc.ServiceResponse) *coprocessorIte {
    ctx, cancel := context.WithCancel(rpc.Context())
    results := make(chan coprocessorRe)
    return &coprocessorIte{
        resultsCh: results,
        cancel:  cancel,
        f: coprocessorFetcher{
            RPCClient: c,
            rpc:       rpc,
            ctx:       ctx,
            resultsCh:   results,
            result:result,
            startRow: rpc.StartRow(),
        },
    }
}

func toLocalServiceResult(r *pb.NameBytesPair,respon hrpc.ServiceResponse) hrpc.ServiceResponse {
    if r == nil {
        return nil
    }
    return respon.GetResponse(r.Value)
}




func (s *coprocessorIte) Next() (hrpc.ServiceResponse, error) {
    s.once.Do(func() {
        go s.f.fetch()
    })
    s.resultsM.Lock()
    println("track Next() 1")

    var result, partial *pb.NameBytesPair
    var err error
    for {
        partial, err = s.peek()
        println("track partial ",partial)
        println("track partial ",err)
        if err == io.EOF && result != nil {
            // no more results, return what we have. Next call to the Next() will get EOF
            result.Value = []byte("")
            s.resultsM.Unlock()
            println("track Next() 2")
            return toLocalServiceResult(result,s.f.result), nil
        }
        if err != nil {

            // return whatever we have so far and the error
            s.resultsM.Unlock()
            println("track Next() 3",err.Error())
            return toLocalServiceResult(result,s.f.result), err
        }

        var done bool
        result, done = s.coalesce(result, partial)
        if done {
            s.shift()
        }

        s.resultsM.Unlock()
        println("track Next() 4")
        return  toLocalServiceResult(result,s.f.result), nil

    }

}

func (s *coprocessorIte) Close() error {
    s.cancel()
    return nil
}

type coprocessorFetcher struct {
    RPCClient
    resultsCh chan<- coprocessorRe
    // rpc is original scan query
    rpc *hrpc.Coprocessor
    ctx context.Context
    // result current result we are adding partials to
    startRow []byte
    result hrpc.ServiceResponse
}



func (f *coprocessorFetcher) trySend(rs  *pb.NameBytesPair, err error) bool {
    if err == nil && len(rs.Value) == 0 {
        return true
    }
    select {
    case <-f.ctx.Done():
        return true
    case f.resultsCh <- coprocessorRe{r: rs, e: err}:
        return false
    }
}


// fetch scans results from appropriate region, sends them to client and updates
// the fetcher for the next scan
func (f *coprocessorFetcher) fetch() {
    for {
        resp, region, err := f.next()
        if err != nil {
            if err != ErrDeadline {
                // if the context of the scan rpc wasn't cancelled (same as calling Close()),
                // return the error to client
                f.trySend(nil, err)
            }
            break
        }
        f.update( region)
        if f.trySend(resp.Value,nil ){
            break
        }
        // check whether we should close the scanner before making next request
        if f.shouldClose(resp, region) {
            break
        }
    }

    close(f.resultsCh)

}

func (f *coprocessorFetcher) next() (*pb.CoprocessorServiceResponse, hrpc.RegionInfo, error) {
    var rpc *hrpc.Coprocessor
    var err error

    rpc,err  = hrpc.NewRangeCoprocessor(f.ctx, string(f.rpc.Table()),f.startRow, f.rpc.GetCorRequest())


    res, err := f.SendRPC(rpc)
    if err != nil {
        return nil, nil, err
    }
    coprocessors, ok := res.(*pb.CoprocessorServiceResponse)
    if !ok {
        return nil, nil, errors.New("got non-ScanResponse for scan request")
    }
    return coprocessors, rpc.Region(), nil
}


// update updates the fetcher for the next scan request
func (f *coprocessorFetcher) update(region hrpc.RegionInfo) {
        f.rpc.GetCorRequest()

        f.startRow = region.StopKey()
}


// shouldClose check if this scanner should be closed and should stop fetching new results
func (f *coprocessorFetcher) shouldClose(resp *pb.CoprocessorServiceResponse, region hrpc.RegionInfo) bool {

    select {
    case <-f.ctx.Done():
        // scanner has been asked to close
        return true
    default:
    }
    // Check to see if this region is the last we should scan because:
    // (1) it's the last region
    if len(region.StopKey()) == 0 {
        return true
    }
    // (3) because its stop_key is greater than or equal to the stop_key of this scanner,
    // provided that (2) we're not trying to scan until the end of the table.
    return len(f.rpc.StopRow()) != 0 && // (2)
        bytes.Compare(f.rpc.StopRow(), region.StopKey()) <= 0 // (3)
}

extend_client.go

package gohbase

import (
    "github.com/tsuna/gohbase/hrpc"
)

type ExtendClient interface{
    Client
    ExecService(request *hrpc.Coprocessor,response hrpc.ServiceResponse) hrpc.CoprocessorIte
}


type extendClient struct {
    *client
}

// NewClient creates a new HBase client.
func NewExtendClient(zkquorum string, options ...Option) ExtendClient {
    return  &extendClient{
        client:newClient(zkquorum, options...),
    }
}



func (c *extendClient) ExecService(request *hrpc.Coprocessor,response hrpc.ServiceResponse) hrpc.CoprocessorIte {
    return newCoprocessorIte(c,request,response)
}

Example: I have a custom corprocessor called Count in the hbase and the proto is follow countServer.proto

package protob;
option java_package = "com.bwdz.coprocessor.endpoint.count";
import "params.proto";
option java_outer_classname = "CountServer";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

message CountRequest {
  required string startKey = 1;
  required string endKey = 2;
  repeated Params params = 3;
  required string column = 4;
  optional string defaultQualifier = 5;
}

message CountResponse {
  required int64 count = 1 [default = 0];
}

service Count {
  rpc sendCountRequest(CountRequest)
    returns (CountResponse);

}
  1. use protobuf generate go file CountServer.pb.go
  2. create request ,response implements hrpc.ServiceRequest/hrpc.ServiceResponse for the Corprocessor request

count_service.go

package corpro

import (
    "zrsf.com/hquery/protob"
    "github.com/tsuna/gohbase/hrpc"
    "github.com/golang/protobuf/proto"
)

type CountServiceResponse struct {
    Count int64
}

func (c *CountServiceResponse) GetResponse(responseByte []byte) hrpc.ServiceResponse {
    res := protob.CountResponse{}
    rsp := &CountServiceResponse{
    }
    if err := proto.Unmarshal(responseByte,&res);err== nil{
        rsp.Count = *res.Count
        return rsp
    }
    return nil
}

func NewCountServiceResponse() *CountServiceResponse{
    return  &CountServiceResponse{}
}



type countServiceRequest struct {
    startKey   string
    endKey           string
    params           []*Params
    column           string
    defaultQualifier string
}

func NewCountServiceRequest(start ,end ,family,defaultQualifier  string , params    []*Params) *countServiceRequest{
    return  &countServiceRequest{
        startKey:start,
        endKey:end,
        params:params,
        column:family,
        defaultQualifier:defaultQualifier,
    }
}

// Name returns the name of this RPC call.
func (s *countServiceRequest) GetServiceName() string {
    return "Count"
}


func (s *countServiceRequest) GetMethodName() string {
    return "sendCountRequest"
}

func (s *countServiceRequest) GetRow() []byte{
    return []byte(s.startKey)
}

func (s *countServiceRequest) GetProtoRequest() ([]byte,error) {
    var cusParams []*protob.Params
    for _,param := range s.params{
        pa := &protob.Params{
            Key: &(param.Key),
            Value: &(param.Value),
            Type: &(param.Type),
        }
        cusParams = append(cusParams,pa)
    }
    requstMessage := protob.CountRequest{
        StartKey:&s.startKey,
        EndKey:&s.endKey,
        Params:cusParams,
        Column:&s.column,
        DefaultQualifier:&s.defaultQualifier,
    }

    return proto.Marshal(&requstMessage)
}
  1. use the coprocessor like scan
countRequest := corpro.NewCountServiceRequest(
        "000000000000000000001607220002340012321933","99999999999999999999999999999999999999999",
        "DATA","JSHJ",nil,
    )
    corp,err := hrpc.NewCoprocessor(context.Background(),"KPF_INDEX",countRequest)
    if err == nil{
        resp := corpro.NewCountServiceResponse()
        ite := util.GetHbaseClient().ExecService(corp,resp)
        result ,err :=   ite.Next()
        for ( err != io.EOF){
            if coutRes, ok :=  result.(*corpro.CountServiceResponse);ok{
                println("ssss",coutRes.Count )
            }
            result ,err  =   ite.Next()
        }

    }

# 踩过的坑
1. 开始总是最难的,最开始一直找不到coprocessor对应的methodName到底是什么,最后看了一篇关于hbase-client实现原理的文章才找到了这个常量 "ExecService" , []byte如何包装,也是在hbase-client看到,其实就是protobuf对对象做一次编码,response则是解码; 另外由于第一次使用protobuf , serviceName,menthodName也是琢磨了很久才知道 .proto文件里面就有这2个信息   2. 在模仿scan实现coprocessor遍历时疏忽了更新startrow,由于开发使用的是单机的hbase,而且数据量很少,一个表只有一个region,这个bug没有修复,而测试时虽然使用的是hbase集群,但是依然是由于数据量小测试表只有一个rtegion,这个bug依然没哟修复,最后提交测试时就一直死循环~~~.

# 总结, 在不同语言转换实现时,会发现对新语言有了深的理解时,反过来对之前使用的语言更深的促进作用,这样是一种良性循环