root@VM-0-15-ubuntu:/etc/nginx# mysql -h 127.0.0.1 -P 3307 -uroot -p Enter password: Welcome to the MySQL monitor. Commands end with ; or \g. Your MySQL connection id is 27868 Server version: 5.7.23-0ubuntu0.16.04.1-log (Ubuntu)
Copyright (c) 2000, 2018, Oracle and/or its affiliates. All rights reserved.
Oracle is a registered trademark of Oracle Corporation and/or its affiliates. Other names may be trademarks of their respective owners.
Type ‘help;’ or ‘\h’ for help. Type ‘\c’ to clear the current input statement.
// someOperation your work to do // if we have some data to return use channel to pass data funcsomeOperation()error { time.Sleep(1 * time.Second) returnnil }
// anotherOperation // another work indenpendent with someOperation funcanotherOperation()error { time.Sleep(1 * time.Second) returnnil }
funcbizFunc()error { wg := sync.WaitGroup{} // sync.WatiGroup to sync goroutine wg.Add(2) // we have 2 operation to do, so we add 2 gofunc() { err := someOperation() if err != nil { // whatever handler } wg.Done() }() gofunc(){ err := anotherOperation() wg.Done() }() wg.Wait() // wait all goroutine to return // other operation depend on the two before }
// SizeWaitGroup the struct control limit of waitgroup type SizeWaitGroup struct { buf chanstruct{} // buffer to buf the current number of goroutines wg sync.WaitGroup // the real wait group }
// NewSizeWaitGroup wait group with limit funcNewSizeWaitGroup(size int) *SizeWaitGroup { if size <= 0 { size = defaultSize } return &SizeWaitGroup{ buf: make(chanstruct{}, size), // init the size of channel wg: sync.WaitGroup{}, } }
// AddWithContext // blocking if the number of goroutines has been reached func(c *SizeWaitGroup) AddWithContext(ctx context.Context) error { // select { case <-ctx.Done(): // parent goroutines call canceled or timedout or other happend return ctx.Err() case c.buf <- struct{}{}: // block if channel is full break } c.wg.Add(1) // we created a goroutine returnnil }
// MetaDataInterceptor get grpc server info, requestId/traceId/LogId funcMetaServerDataInterceptor() grpc.UnaryServerInterceptor { // 拦截器函数签名 // @params ctx Grpc context // @params req grpc request // @params info grpc request info // @params handler the grpc method returnfunc(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { // do what you want to do // get metadata from grpc client md, ok := metadata.FromIncomingContext(ctx) if !ok { md = metadata.Pairs() } // Set request info for context. // define your key for _, key := range []string{"requestId"} { value := md.Get(key) // ignore it if not exists. iflen(value) >= 1 { // set value to context. you can use ctx.Value to get it from your grpc method ctx = context.WithValue(ctx, key, value[0]) } } // next return handler(ctx, req) } }
// API time elas time get grpc server info funcAPITimeInterceptor() grpc.UnaryServerInterceptor { // 拦截器签名 returnfunc(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error) { // do what you want to do start := time.Now().UnixNano() // do gRPC method ret := handler(ctx, req) // do what you want after the grpc method fmt.Println(time.Now().UnixNano() - start) return ret } }
// WrappedStream wraps around the embedded grpc.ServerStream, and intercepts the Context type WrappedStream struct { grpc.ServerStream // serverStream interface Ctx *context.Context // 定义ctx,覆盖ServerStream中的context }
// Context override the context method and can config the context manually func(c WrappedStream) Context() context.Context { return *c.Ctx }
// stream method to get meta data funcMetaStreamServerInterceptor() grpc.StreamServerInterceptor { // 函数签名 returnfunc( srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler)error { // 获取当前 grpc context ctx := ss.Context() md, ok := metadata.FromIncomingContext(ctx) if !ok { md = metadata.Pairs() } // Set request info for context. // define your key for _, key := range []string{"requestId"} { value := md.Get(key) // ignore it if not exists. iflen(value) >= 1 { // set value to context. you can use ctx.Value to get it from your grpc method ctx = context.WithValue(ctx, key, value[0]) } } // set context to next return handler(srv, streaminterceptor.NewWrappedStream(ss, &ctx)) } }
// MetaStreamClientInterceptor get grpc client info, requestId/traceId/LogId for grpc stream server funcMetaStreamClientInterceptor() grpc.StreamClientInterceptor { // 函数签名 returnfunc(ctx context.Context, desc *grpc.StreamDesc, cc *grpc.ClientConn, method string, streamer grpc.Streamer, opts ...grpc.CallOption) (grpc.ClientStream, error) {
// 从context获取元数据 md, ok := metadata.FromOutgoingContext(ctx) if !ok { md = metadata.Pairs() } for _, key := range keyNames { value := ctx.Value(key) if strValue, ok := value.(string); ok && strValue != "" { md.Set(key, strValue) } } // set metadata to ctx ctx = metadata.NewOutgoingContext(ctx, md)
// StreamServerInterceptor provides a hook to intercept the execution of a streaming RPC on the server. // info contains all the information of this RPC the interceptor can operate on. And handler is the // service method implementation. It is the responsibility of the interceptor to invoke handler to // complete the RPC. func(srv interface{}, ss grpc.ServerStream, info *grpc.StreamServerInfo, handler grpc.StreamHandler)error
// @params ctx: grpc context // @params req: the request params // @params info: the grpc request info // @params handler: the real grpc method func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (resp interface{}, err error)
客户端普通拦截器
golang在调用grpc之前执行的公共的操作,比如要把requestId塞到header中。
1 2 3 4 5 6 7 8 9 10 11
// @params method: the RPC name // @params req: the request // @params resp: the response // @params cc: the ClientConn on which the RPC was invoked // @params invoker: the invoker of grpc methor // @params opts: the option func( ctx context.Context, method string, req, resp interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption, )
客户端流失拦截器
实现如下签名的函数即可
1 2 3 4 5 6
// @params desc: contains a description of the stream // @params cc: the ClientConn on which the RPC was invoked // @params method: the RPC name // @params streamer: the handler to create a ClientStream and it is the responsibility of the interceptor to call it // @params opts: the option func(ctx context.Context, desc *StreamDesc, cc *ClientConn, method string, streamer Streamer, opts ...CallOption) (ClientStream, error)
"dynamic_templates":[ { "my_template_name":{ ... match conditions ... "mapping":{ ... } # match field use mappings } }, ... ] # The match conditions can include any of : match_mapping_type, match, match_pattern, unmatch, path_match, path_unmatch.
Turns on compression of the snapshot files. Compression is applied only to metadata files (index mapping and settings). Data files are not compressed. Defaults to true.
chunk_size
Big files can be broken down into chunks during snapshotting if needed. Specify the chunk size as a value and unit, for example: 1GB, 10MB, 5KB, 500B. Defaults to null (unlimited chunk size).
max_restore_bytes_per_sec
Throttles per node restore rate. Defaults to 40mb per second.
max_snapshot_bytes_per_sec
Throttles per node snapshot rate. Defaults to 40mb per second.
Interceptor 有点类似于我们平时常用的 HTTP Middleware,不同的是它可以用在 Client 端和 Server 端。比如在收到请求之后输出日志,在请求出现错误的时候输出错误信息,比如获取请求中设置的 Request ID。
Golang实现
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19
// UnaryInvoker is called by UnaryClientInterceptor to complete RPCs. type UnaryInvoker func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, opts ...CallOption)error
// UnaryClientInterceptor intercepts the execution of a unary RPC on the client. invoker is the handler to complete the RPC // and it is the responsibility of the interceptor to call it. // This is an EXPERIMENTAL API. type UnaryClientInterceptor func(ctx context.Context, method string, req, reply interface{}, cc *ClientConn, invoker UnaryInvoker, opts ...CallOption)error
// UnaryHandler defines the handler invoked by UnaryServerInterceptor to complete the normal // execution of a unary RPC. If a UnaryHandler returns an error, it should be produced by the // status package, or else gRPC will use codes.Unknown as the status code and err.Error() as // the status message of the RPC. type UnaryHandler func(ctx context.Context, req interface{}) (interface{}, error)
// UnaryServerInterceptor provides a hook to intercept the execution of a unary RPC on the server. info // contains all the information of this RPC the interceptor can operate on. And handler is the wrapper // of the service method implementation. It is the responsibility of the interceptor to invoke handler // to complete the RPC. type UnaryServerInterceptor func(ctx context.Context, req interface{}, info *UnaryServerInfo, handler UnaryHandler) (resp interface{}, err error)
[root@VM-145-82-centos ~]# yum install logrotate Loaded plugins: fastestmirror Loading mirror speeds from cached hostfile Package logrotate-3.8.6-14.tl2.x86_64 already installed and latest version Nothing to do [root@VM-145-82-centos ~]#
1 2 3 4 5 6
root@VM-0-15-ubuntu:[10:59:47]:~# apt-get install logrotate Reading package lists... Done Building dependency tree Reading state information... Done logrotate is already the newest version (3.8.7-2ubuntu2.16.04.2). 0 upgraded, 0 newly installed, 0 to remove and 385 not upgraded.
[root@VM-145-82-centos /etc/logrotate.d/cron.30m]# logrotate -d /etc/logrotate.d/cron.30m/* reading config file /etc/logrotate.d/cron.30m/nginx Allocating hash table for state file, size 15360 B
Handling 1 logs
rotating pattern: /var/log/nginx/*log after 1 days (10 rotations) empty log files are not rotated, old logs are removed considering log /var/log/nginx/access.log log does not need rotating (log is empty)considering log /var/log/nginx/error.log log does not need rotating (log is empty)not running postrotate script, since no logs were rotated
执行后,会输出文件怎么变更,压缩重命名等。比如上面会提示 “old logs are removed” “10 rotate” 等记录。
deferclose(data) // consumer gofunc() { for { select { case <-done: fmt.Println("child process interrupt...") return default: fmt.Printf("send message: %d\n", <-data) time.Sleep(time.Second * 1) } } }()
// producer for i := 0; i < 10; i++ { data <- i } time.Sleep(5 * time.Second) // 退出协程 close(done) time.Sleep(1 * time.Second) fmt.Println("main process exit!") }
type cancelCtx struct { Context mu sync.Mutex // protects following fields done chanstruct{} // created lazily, closed by first cancel call children map[canceler]struct{} // set to nil by the first cancel call err error// set to non-nil by the first cancel call }