Elegantly read http request or response data

Elegantly read http request or response data

There are many ways to read data from http.Request.Body or http.Response.Body. Most of the methods in the standard library use ioutil.ReadAll to read all the data at once. If the data is in json format, you can also use json.NewDecoder to create a parser from io.Reader. If you use pprof to analyze the program, you will always find that bytes.makeSlice allocates a lot of memory and is always ranked first. Today, let's talk about how to read data from http efficiently and elegantly.

[[256407]]

Background

We have many API services, all of which use the JSON data format. The request body is the entire JSON string. When a request reaches the server, it will go through some business processing, and then request more services. All services communicate with each other using the HTTP protocol (ah, why not use RPC, because all services will be open to third parties, and HTTP + JSON is better for docking). Most request data sizes are between 1K~4K, and response data are between 1K~8K. In the early days, all services used ioutil.ReadAll to read data. As the traffic increased, pprof was used to analyze and found that bytes.makeSlice was always ranked last and occupied 1/10 of the memory allocation of the entire program. I decided to optimize this problem. The following is a record of the entire optimization process.

pprof analysis

Here we use the API in https://github.com/thinkeridea/go-extend/blob/master/exnet/exhttp/expprof/pprof.go to implement the /debug/pprof monitoring interface of the production environment. The net/http/pprof package of the standard library is not used because it will automatically register the route and open the API for a long time. This package can set whether the API is open and automatically close the interface after a specified time to avoid tool sniffing.

After the service deployment is stable (about a day and a half later), download the allocs data through curl, and then use the following command to view the analysis.

  1. $ go tool pprof allocs
  2. File: xxx
  3. Type: alloc_space
  4. Time : Jan 25, 2019 at 3:02pm (CST)
  5. Entering interactive mode (type "help"   for commands, "o"   for options)
  6. (pprof) top  
  7. Showing nodes accounting for 604.62GB, 44.50% of 1358.61GB total
  8. Dropped 776 nodes (cum <= 6.79GB)
  9. Showing top 10 nodes out   of 155
  10. flat flat% sum % cum cum%
  11. 111.40GB 8.20% 8.20% 111.40GB 8.20% bytes.makeSlice
  12. 107.72GB 7.93% 16.13% 107.72GB 7.93% github.com/sirupsen/logrus.(*Entry).WithFields
  13. 65.94GB 4.85% 20.98% 65.94GB 4.85% strings. Replace  
  14. 54.10GB 3.98% 24.96% 56.03GB 4.12% github.com/json-iterator/go.(*frozenConfig).Marshal
  15. 47.54GB 3.50% 28.46% 47.54GB 3.50% net/url.unescape
  16. 47.11GB 3.47% 31.93% 48.16GB 3.55% github.com/json-iterator/go.(*Iterator).readStringSlowPath
  17. 46.63GB 3.43% 35.36% 103.04GB 7.58% handlers.(*AdserviceHandler).returnAd
  18. 42.43GB 3.12% 38.49% 84.62GB 6.23% models.LogItemsToBytes
  19. 42.22GB 3.11% 41.59% 42.22GB 3.11% strings. Join  
  20. 39.52GB 2.91% 44.50% 87.06GB 6.41% net/url.parseQuery

From the results, we can see that a total of 1358.61GB was allocated during the collection period, and the top 10 occupied 44.50%, of which bytes.makeSlice accounted for nearly 1/10. So let's see who is calling bytes.makeSlice.

  1. (pprof) web bytes.makeSlice

From the above figure, we can see that the final method of calling bytes.makeSlice is ioutil.ReadAll (due to the length of the article, the method above ioutil.ReadAll is not intercepted), and 90% of them are ioutil.ReadAll calls to read http data. Don't rush to think of optimization solutions when you find the place. Let's first see why ioutil.ReadAll causes so much memory allocation.

  1. func readAll(r io.Reader, capacity int64) (b []byte, err error) {
  2. var buf bytes.Buffer
  3. // If the buffer overflows, we will get bytes.ErrTooLarge.
  4. // Return that as an error. Any other panic remains.
  5. defer func() {
  6. e := recover()
  7. if e == nil {
  8. return  
  9. }
  10. if panicErr, ok := e.(error); ok && panicErr == bytes.ErrTooLarge {
  11. err = panicErr
  12. } else {
  13. panic(e)
  14. }
  15. }()
  16. if int64( int (capacity)) == capacity {
  17. buf.Grow( int (capacity))
  18. }
  19. _, err = buf.ReadFrom(r)
  20. return buf.Bytes(), err
  21. }
  22.  
  23. func ReadAll(r io.Reader) ([]byte, error) {
  24. return readAll(r, bytes.MinRead)
  25. }

The above is the code of the standard library ioutil.ReadAll. Each time, a var buf bytes.Buffer is created and the size of buf.Grow(int(capacity)) is initialized to bytes.MinRead, which is 512. According to the size of this buffer, 2 to 16 memory allocations are required to read data once. This is unbearable. I should create a buffer myself.

Take a look at the flame graph 🔥. The part marked with a red frame is the ioutil.ReadAll part, which has a brighter color.

Optimizing reading methods

Create a large enough buffer yourself to reduce the problem of multiple expansions due to insufficient capacity.

  1. buffer := bytes.NewBuffer(make([]byte, 4096))
  2. _, err := io.Copy(buffer, request.Body)
  3. if err != nil{
  4. return nil, err
  5. }

Well, this should be about the same. Why is the size of 4096 initialized? This is an average. Even if it is larger than 4096, basically you only need to allocate memory once more, and most data is smaller than 4096.

But is this really a good idea? Of course not. This buffer needs to be created for each request. Shouldn't we consider reuse? Using sync.Pool to create a buffer pool would be even better.

Here is the simplified code for optimizing read requests:

  1. package adapter
  2.  
  3. import (
  4. "bytes"  
  5. "io"  
  6. "net/http"  
  7. "sync"  
  8.  
  9. "github.com/json-iterator/go"  
  10. "github.com/sirupsen/logrus"  
  11. "github.com/thinkeridea/go-extend/exbytes"  
  12. )
  13.  
  14. type Adapter struct {
  15. pool sync.Pool
  16. }
  17.  
  18. func New() *Adapter {
  19. return &Adapter{
  20. pool: sync.Pool{
  21. New: func() interface{} {
  22. return bytes.NewBuffer(make([]byte, 4096))
  23. },
  24. },
  25. }
  26. }
  27.  
  28. func (api *Adapter) GetRequest(r *http.Request) (*Request, error) {
  29. buffer := api.pool.Get().(*bytes.Buffer)
  30. buffer.Reset()
  31. defer func() {
  32. if buffer != nil {
  33. api.pool.Put(buffer)
  34. buffer = nil
  35. }
  36. }()
  37.  
  38. _, err := io.Copy(buffer, r.Body)
  39. if err != nil {
  40. return nil, err
  41. }
  42.  
  43. request := &Request{}
  44. if err = jsoniter.Unmarshal(buffer.Bytes(), request); err != nil {
  45. logrus.WithFields(logrus.Fields{
  46. "json" : exbytes.ToString(buffer.Bytes()),
  47. }).Errorf( "jsoniter.UnmarshalJSON fail. error:%v" , err)
  48. return nil, err
  49. }
  50. api.pool.Put(buffer)
  51. buffer = nil
  52.  
  53. // ....
  54.      
  55. return request, nil
  56. }

Is the way of using sync.Pool a bit strange? It's mainly because of defer and api.pool.Put(buffer);buffer = nil. Let me explain here. In order to improve the reuse rate of buufer, the buffer will be put back to the buffer pool as soon as possible when it is not in use. The reason why defer judges buffer != nil is mainly when an error occurs in the business logic, but the buffer is not put back to the buffer pool yet, so the buffer is put back to the buffer pool. Because writing api.pool.Put(buffer) after each error handling is not a good method and it is easy to forget. However, if api.pool.Put(buffer);buffer = nil is used when it is determined that it will not be used anymore, the buffer can be put back to the buffer pool as soon as possible, which improves the reuse rate and reduces the creation of new buffers.

Is that all? Don’t worry. As mentioned before, the service will also build requests. Let’s see how to optimize the build request.

  1. package adapter
  2.  
  3. import (
  4. "bytes"  
  5. "fmt"  
  6. "io"  
  7. "io/ioutil"  
  8. "net/http"  
  9. "sync"  
  10.  
  11. "github.com/json-iterator/go"  
  12. "github.com/sirupsen/logrus"  
  13. "github.com/thinkeridea/go-extend/exbytes"  
  14. )
  15.  
  16. type Adapter struct {
  17. pool sync.Pool
  18. }
  19.  
  20. func New() *Adapter {
  21. return &Adapter{
  22. pool: sync.Pool{
  23. New: func() interface{} {
  24. return bytes.NewBuffer(make([]byte, 4096))
  25. },
  26. },
  27. }
  28. }
  29.  
  30. func (api *Adapter) Request(r *Request) (*Response, error) {
  31. var err error
  32. buffer := api.pool.Get().(*bytes.Buffer)
  33. buffer.Reset()
  34. defer func() {
  35. if buffer != nil {
  36. api.pool.Put(buffer)
  37. buffer = nil
  38. }
  39. }()
  40.  
  41. e := jsoniter.NewEncoder(buffer)
  42. err = e.Encode(r)
  43. if err != nil {
  44. logrus.WithFields(logrus.Fields{
  45. "request" : r,
  46. }).Errorf( "jsoniter.Marshal failure: %v" , err)
  47. return nil, fmt.Errorf( "jsoniter.Marshal failure: %v" , err)
  48. }
  49.  
  50. data := buffer.Bytes()
  51. req, err := http.NewRequest( "POST" , "http://xxx.com" , buffer)
  52. if err != nil {
  53. logrus.WithFields(logrus.Fields{
  54. "data" : exbytes.ToString(data),
  55. }).Errorf( "http.NewRequest failed: %v" , err)
  56. return nil, fmt.Errorf( "http.NewRequest failed: %v" , err)
  57. }
  58.  
  59. req.Header.Set ( "User-Agent" , "xxx" )
  60.  
  61. httpResponse, err := http.DefaultClient.Do(req)
  62. if httpResponse != nil {
  63. defer func() {
  64. io.Copy(ioutil.Discard, httpResponse.Body)
  65. httpResponse.Body. Close ()
  66. }()
  67. }
  68.  
  69. if err != nil {
  70. logrus.WithFields(logrus.Fields{
  71. "url" : "http://xxx.com" ,
  72. }).Errorf( "query service failed %v" , err)
  73. return nil, fmt.Errorf( "query service failed %v" , err)
  74. }
  75.  
  76. if httpResponse.StatusCode != 200 {
  77. logrus.WithFields(logrus.Fields{
  78. "url" : "http://xxx.com" ,
  79. "status" : httpResponse.Status,
  80. "status_code" : httpResponse.StatusCode,
  81. }).Errorf( "invalid http status code" )
  82. return nil, fmt.Errorf( "invalid http status code" )
  83. }
  84.  
  85. buffer.Reset()
  86. _, err = io.Copy(buffer, httpResponse.Body)
  87. if err != nil {
  88. return nil, fmt.Errorf( "adapter io.copy failure error:%v" , err)
  89. }
  90.  
  91. respData := buffer.Bytes()
  92. logrus.WithFields(logrus.Fields{
  93. "response_json" : exbytes.ToString(respData),
  94. }).Debug( "response json" )
  95.  
  96. res := &Response{}
  97. err = jsoniter.Unmarshal(respData, res)
  98. if err != nil {
  99. logrus.WithFields(logrus.Fields{
  100. "data" : exbytes.ToString(respData),
  101. "url" : "http://xxx.com" ,
  102. }).Errorf( "adapter jsoniter.Unmarshal failed, error:%v" , err)
  103. return nil, fmt.Errorf( "adapter jsoniter.Unmarshal failed, error:%v" , err)
  104. }
  105.      
  106. api.pool.Put(buffer)
  107. buffer = nil
  108.  
  109. // ...
  110. return res, nil
  111. }

This example is similar to the previous one, except that it is not only used to read http.Response.Body, but also to create a jsoniter.NewEncoder to compress the request into a json string and use it as the body parameter of http.NewRequest. If you use jsoniter.Marshal directly, it will also create a lot of memory. jsoniter also uses buffer as the buffer, and the default size is 512. The code is as follows:

  1. func (cfg Config) Froze() API {
  2. api := &frozenConfig{
  3. sortMapKeys: cfg.SortMapKeys,
  4. indentionStep: cfg.IndentionStep,
  5. objectFieldMustBeSimpleString: cfg.ObjectFieldMustBeSimpleString,
  6. onlyTaggedField: cfg.OnlyTaggedField,
  7. disallowUnknownFields: cfg.DisallowUnknownFields,
  8. }
  9. api.streamPool = &sync.Pool{
  10. New: func() interface{} {
  11. return NewStream(api, nil, 512)
  12. },
  13. }
  14. // .....
  15. return api
  16. }

And after serialization, a data copy will be made:

  1. func (cfg *frozenConfig) Marshal(v interface{}) ([]byte, error) {
  2. stream := cfg.BorrowStream(nil)
  3. defer cfg.ReturnStream(stream)
  4. stream.WriteVal(v)
  5. if stream.Error != nil {
  6. return nil, stream.Error
  7. }
  8. result := stream.Buffer()
  9. copied := make([]byte, len(result))
  10. copy(copied, result)
  11. return copied, nil
  12. }

Since we are going to use buffer, let's do it together ^_^. This can reduce multiple memory allocations. Before reading http.Response.Body, be sure to remember buffer.Reset(). This basically completes the data reading optimization of http.Request.Body and http.Response.Body. The specific effect will be checked after it runs online for a while and stabilizes.

Effect analysis

After running online for a day, let’s take a look at the results.

  1. $ go tool pprof allocs2
  2. File: connect_server
  3. Type: alloc_space
  4. Time : Jan 26, 2019 at 10:27am (CST)
  5. Entering interactive mode (type "help"   for commands, "o"   for options)
  6. (pprof) top  
  7. Showing nodes accounting for 295.40GB, 40.62% of 727.32GB total
  8. Dropped 738 nodes (cum <= 3.64GB)
  9. Showing top 10 nodes out   of 174
  10. flat flat% sum % cum cum%
  11. 73.52GB 10.11% 10.11% 73.52GB 10.11% git.tvblack.com/tvblack/connect_server/vendor/github.com/sirupsen/logrus.(*Entry).WithFields
  12. 31.70GB 4.36% 14.47% 31.70GB 4.36% net/url.unescape
  13. 27.49GB 3.78% 18.25% 54.87GB 7.54% git.tvblack.com/tvblack/connect_server/models.LogItemsToBytes
  14. 27.41GB 3.77% 22.01% 27.41GB 3.77% strings. Join  
  15. 25.04GB 3.44% 25.46% 25.04GB 3.44% bufio.NewWriterSize
  16. 24.81GB 3.41% 28.87% 24.81GB 3.41% bufio.NewReaderSize
  17. 23.91GB 3.29% 32.15% 23.91GB 3.29% regexp.(*bitState).reset
  18. 23.06GB 3.17% 35.32% 23.06GB 3.17% math/big.nat.make
  19. 19.90GB 2.74% 38.06% 20.35GB 2.80% git.tvblack.com/tvblack/connect_server/vendor/github.com/json-iterator/go.(*Iterator).readStringSlowPath
  20. 18.58GB 2.56% 40.62% 19.12GB 2.63% net/textproto.(*Reader).ReadMIMEHeader

Wow! bytes.makeSlice finally disappeared from the top 10. That’s great. Let’s take a look at other calls to bytes.makeSlice.

  1. (pprof) web bytes.makeSlice

From the figure, we can find that the allocation of bytes.makeSlice is already very small, and most of it is because http.Request.ParseForm reads http.Request.Body using ioutil.ReadAll. The effect of this optimization is very good.

Let's take a look at the more intuitive flame graph. Compared with before optimization, it is obvious that ioutil.ReadAll is no longer visible.

Problems encountered during optimization

I am ashamed to say that there was a mistake during the optimization process, which caused the production environment to fail for 2 minutes. It was quickly recovered through automatic deployment and immediate rollback. After that, the code was analyzed and the problem was solved before it was fully optimized after going online. Let me summarize the problems that occurred.

When building the http request, I divided it into two parts for optimization: serializing json and reading http.Response.Body data. I kept the idea of ​​putting the buffer back into the buffer pool as soon as possible, because http.DefaultClient.Do(req) is a network request, which is relatively time-consuming. Before this, I put the buffer back into the buffer pool, and then re-acquire a buffer when reading http.Response.Body. The code is as follows:

  1. package adapter
  2.  
  3. import (
  4. "bytes"  
  5. "fmt"  
  6. "io"  
  7. "io/ioutil"  
  8. "net/http"  
  9. "sync"  
  10.  
  11. "github.com/json-iterator/go"  
  12. "github.com/sirupsen/logrus"  
  13. "github.com/thinkeridea/go-extend/exbytes"  
  14. )
  15.  
  16. type Adapter struct {
  17. pool sync.Pool
  18. }
  19.  
  20. func New() *Adapter {
  21. return &Adapter{
  22. pool: sync.Pool{
  23. New: func() interface{} {
  24. return bytes.NewBuffer(make([]byte, 4096))
  25. },
  26. },
  27. }
  28. }
  29.  
  30. func (api *Adapter) Request(r *Request) (*Response, error) {
  31. var err error
  32. buffer := api.pool.Get().(*bytes.Buffer)
  33. buffer.Reset()
  34. defer func() {
  35. if buffer != nil {
  36. api.pool.Put(buffer)
  37. buffer = nil
  38. }
  39. }()
  40.  
  41. e := jsoniter.NewEncoder(buffer)
  42. err = e.Encode(r)
  43. if err != nil {
  44. return nil, fmt.Errorf( "jsoniter.Marshal failure: %v" , err)
  45. }
  46.  
  47. data := buffer.Bytes()
  48. req, err := http.NewRequest( "POST" , "http://xxx.com" , buffer)
  49. if err != nil {
  50. return nil, fmt.Errorf( "http.NewRequest failed: %v" , err)
  51. }
  52.  
  53. req.Header.Set ( "User-Agent" , "xxx" )
  54.  
  55. api.pool.Put(buffer)
  56. buffer = nil
  57.      
  58. httpResponse, err := http.DefaultClient.Do(req)
  59.      
  60.      
  61. // ....
  62.  
  63. buffer = api.pool.Get().(*bytes.Buffer)
  64. buffer.Reset()
  65. defer func() {
  66. if buffer != nil {
  67. api.pool.Put(buffer)
  68. buffer = nil
  69. }
  70. }()
  71. _, err = io.Copy(buffer, httpResponse.Body)
  72. if err != nil {
  73. return nil, fmt.Errorf( "adapter io.copy failure error:%v" , err)
  74. }
  75.  
  76. // ....
  77.      
  78. api.pool.Put(buffer)
  79. buffer = nil
  80.  
  81. // ...
  82. return res, nil
  83. }

An error occurred immediately after going online: http: ContentLength=2090 with Body length 0. When sending a request, I read data from the buffer and found that the data was missing or insufficient. What the hell was going on? I rolled back and restored the business immediately, and then analyzed http.DefaultClient.Do(req) and http.NewRequest. When calling http.NewRequest, no data was read from the buffer, but only a req.GetBody was created and then the data was read in http.DefaultClient.Do. Because the buffer was put back into the buffer pool before http.DefaultClient.Do, other goroutines obtained the buffer and reset it, which caused data contention. Of course, it led to incomplete data reading. I was really ashamed that I knew too little about http.Client. I will try to go through the source code when I have time.

Summarize

Use a buffer of appropriate size to reduce memory allocation. sync.Pool can help reuse buffers. Be sure to write these logics yourself and avoid using third-party packages. Even if third-party packages use the same techniques, in order to avoid data contention, they will inevitably copy a new data when returning data. For example, although jsoniter uses sync.Pool and buffer, it still needs to copy the data when returning it. In addition, this general package cannot give an initial buffer size that is very suitable for the business. Too small will lead to data copying, and too large will waste too much memory.

Making good use of buffer and sync.Pool in a program can greatly improve the performance of the program, and the combination of these two is very simple to use and does not make the code complicated.

<<:  Summary of the third phase of 5G technology R&D trials: the newly added 2.6GHz frequency band is consistent with the 3.5GHz test results

>>:  Problems that edge computing needs to solve urgently

Recommend

Which collaboration metrics determine the business value of an application?

The criteria for evaluating the business value of...

It may be too early to claim that 5G package user development has cooled down

[[384501]] The three major operators all released...

Design and analysis of weak current intelligent system in intelligent building

The intelligentization of weak-current electricit...

...