您尚未登录,请登录后浏览更多内容! 登录 | 立即注册

QQ登录

只需一步,快速开始

 找回密码
 立即注册

QQ登录

只需一步,快速开始

查看: 14815|回复: 0
打印 上一主题 下一主题

[html5] 用于实例化Worker后执行监听

[复制链接]
跳转到指定楼层
楼主
发表于 2018-12-17 21:22:08 | 只看该作者 回帖奖励 |倒序浏览 |阅读模式
  1. void Worker::listen(void)
复制代码
用于实例化Worker后执行监听。
此方法主要用于在Worker进程启动后动态创建新的Worker实例,能够实现同一个进程监听多个端口,支持多种协议。需要注意的是用这种方法只是在当前进程增加监听,并不会动态创建新的进程,也不会触发onWorkerStart方法。
例如一个http Worker启动后实例化一个websocket Worker,那么这个进程即能通过http协议访问,又能通过websocket协议访问。由于websocket Worker和http Worker在同一个进程中,所以它们可以访问共同的内存变量,共享所有socket连接。可以做到接收http请求,然后操作websocket客户端完成向客户端推送数据类似的效果。
注意:
如果PHP版本<=7.0,则不支持在多个子进程中实例化相同端口的Worker。例如A进程创建了监听2016端口的Worker,那么B进程就不能再创建监听2016端口的Worker,否则会报Address already in use错误。例如下面的代码是无法运行的。
  1. use Workerman\Worker;3 m2 k  j% V* E
  2. require_once __DIR__ . '/Workerman/Autoloader.php';) E3 c1 G3 A, H, s' p

  3. - b5 B0 J7 C" ?, L1 ?- ]
  4. $worker = new Worker();# r" H* s! a. P3 k2 t
  5. // 4个进程
    - F! `0 `" d; K  i& F
  6. $worker->count = 4;( V6 S# q5 }: Y1 j# L
  7. // 每个进程启动后在当前进程新增一个Worker监听5 b& V' e$ D' _8 Q/ E, ^( K" c  m
  8. $worker->onWorkerStart = function($worker)
    5 z# o: `/ U2 J6 T5 x
  9. {
    0 F$ Z; I% W( @1 O
  10.     /**
    : G3 n4 u7 j, i& F& \& T
  11.      * 4个进程启动的时候都创建2016端口的Worker5 J3 }+ {* _! S' z, S" k  J9 H
  12.      * 当执行到worker->listen()时会报Address already in use错误, t$ c9 y( c) p# E/ M
  13.      * 如果worker->count=1则不会报错' ]" Y0 J2 U/ \
  14.      */
    ' o) E( q* i$ C# O" j8 O# X4 P
  15.     $inner_worker = new Worker('http://0.0.0.0:2016');
    ! h5 t$ J1 c8 {, V. a
  16.     $inner_worker->onMessage = 'on_message';
    4 V! Q" |3 B- x3 I5 J6 k
  17.     // 执行监听。这里会报Address already in use错误
    - y9 n# [' i, O$ ]
  18.     $inner_worker->listen();4 S6 |3 f& i$ O9 r: {
  19. };) j( V3 Z, f7 T+ z% ~8 d" a

  20.   k; U. `$ e- h4 b# ~% _
  21. $worker->onMessage = 'on_message';
    + \; ]+ w; J2 c  ~  H/ v/ X

  22. , i9 q& _5 o/ K% |
  23. function on_message($connection, $data)
      z/ t1 ]7 l/ \0 X  b3 K: ~0 Y
  24. {. q; ?: d, m/ C4 m! G( R
  25.     $connection->send("hello\n");
    . ~  _& i2 m0 M/ V' z
  26. }* l# K1 e( U# N2 q& q6 C

  27. ) v9 M8 S3 \+ o
  28. // 运行worker
    - V/ W3 C8 _, q: s- q$ Z4 I
  29. Worker::runAll();
    . w2 a5 w2 E6 E: k' c
  30. 如果您的PHP版本>=7.0,可以设置Worker->reusePort=true, 这样可以做到多个子进程创建相同端口的Worker。见下面的例子:( }( {0 q. L/ b+ p% f
  31. : p0 |/ \$ g; E7 [
  32. use Workerman\Worker;) _4 {1 G% p, V# Y9 F. C
  33. require_once './Workerman/Autoloader.php';
    : p, p. J& ^0 Q( m% z7 t$ T5 D6 m! r

  34. 9 s" B& Z7 m8 O& P9 b
  35. $worker = new Worker('text://0.0.0.0:2015');! _; u' w' ?& Y) z$ V( ?6 D
  36. // 4个进程
    + ?/ `* U) _  U  C1 B" N2 p
  37. $worker->count = 4;
    2 W; A; D1 B7 C) a5 Q0 K
  38. // 每个进程启动后在当前进程新增一个Worker监听% w: S$ f) e& i2 M! `
  39. $worker->onWorkerStart = function($worker)
      R0 D( I. C7 g6 ~3 ^  A, |
  40. {
    , g- V4 R* }# R
  41.     $inner_worker = new Worker('http://0.0.0.0:2016');
    , W/ Y% @& P% g6 d; j
  42.     // 设置端口复用,可以创建监听相同端口的Worker(需要PHP>=7.0)5 `! {0 Q4 Q) x* K! O  Q; n3 j4 a
  43.     $inner_worker->reusePort = true;8 l; I5 d: J0 W3 E1 j
  44.     $inner_worker->onMessage = 'on_message';5 _: c, B- h2 s
  45.     // 执行监听。正常监听不会报错
    # R; i" s# K, V4 Z9 J& k; W
  46.     $inner_worker->listen();
    6 A: j! [6 o9 z; _+ s% k
  47. };) i: Q4 Z1 G  O1 b# F
  48. - Z9 y% b9 j% I* s3 {# R6 A+ I
  49. $worker->onMessage = 'on_message';
    * G9 ]% O. M8 D* V1 C8 Q

  50. : L: `! X# D' J1 c+ z4 m4 k$ q
  51. function on_message($connection, $data)" Z* E) L/ Q/ ^
  52. {
    % ?( v9 u4 B/ \- q' d6 D  Y
  53.     $connection->send("hello\n");; F. Z7 x: ]" R2 r- r/ k
  54. }- A5 L* w7 ~$ @( B. Q& o: ]

  55. # n  ^" I. `6 t. j
  56. // 运行worker5 W6 H4 C% x: L
  57. Worker::runAll();
复制代码
示例 php后端及时推送消息给客户端
原理:
1、建立一个websocket Worker,用来维持客户端长连接
2、websocket Worker内部建立一个text Worker
3、websocket Worker 与 text Worker是同一个进程,可以方便的共享客户端连接
4、某个独立的php后台系统通过text协议与text Worker通讯
5、text Worker操作websocket连接完成数据推送
代码及步骤
push.php
  1. <?php
    0 N/ H) k$ W% k; f: d" ~  e
  2. use Workerman\Worker;3 {* H3 A0 R4 N9 Q  q1 V" U
  3. require_once './Workerman/Autoloader.php';
    ! t# p. [9 _+ h( X
  4. // 初始化一个worker容器,监听1234端口. a2 }- |% Y; _4 Q
  5. $worker = new Worker('websocket://0.0.0.0:1234');  _& t5 G! [4 t

  6. 6 K9 P4 v! {) N2 V' _% l0 p
  7. /*
    5 [# a4 W# Q: ?, E, l
  8. * 注意这里进程数必须设置为1,否则会报端口占用错误8 m8 G$ d4 {- s: @$ M
  9. * (php 7可以设置进程数大于1,前提是$inner_text_worker->reusePort=true)
    6 n) M9 O; x! T; m1 O  |
  10. */
    ! H, E8 S" d: a! m* B) l3 q. Z8 g
  11. $worker->count = 1;
    9 p/ ^( C5 ?! a9 _. ~8 @
  12. // worker进程启动后创建一个text Worker以便打开一个内部通讯端口- E& v" m0 A; ~: E
  13. $worker->onWorkerStart = function($worker)
    + x8 O  ^2 y' t) c2 p8 r
  14. {; K, v. ?& ]' A8 E: Q
  15.     // 开启一个内部端口,方便内部系统推送数据,Text协议格式 文本+换行符
    3 K: I" d: ?% H
  16.     $inner_text_worker = new Worker('text://0.0.0.0:5678');
    / a0 D2 L; E. V
  17.     $inner_text_worker->onMessage = function($connection, $buffer)
    % t7 Y( a* Y5 W, q* P$ n, p2 l3 w7 b& x
  18.     {
    * L/ Q. _3 B6 i8 n6 o
  19.         // $data数组格式,里面有uid,表示向那个uid的页面推送数据& V7 m" N1 @$ z9 W) M4 l
  20.         $data = json_decode($buffer, true);
    ' Q; Y' s- V9 a9 b. n
  21.         $uid = $data['uid'];/ P& w9 S, X7 t) ]2 H# f3 ^& j
  22.         // 通过workerman,向uid的页面推送数据
    9 a0 }! l8 E- c- ~/ I/ @/ Y9 C0 }
  23.         $ret = sendMessageByUid($uid, $buffer);+ ]) C9 _1 n6 p" F1 ?$ F1 b
  24.         // 返回推送结果! k2 G2 {2 `8 s+ |* ]$ W  O+ g
  25.         $connection->send($ret ? 'ok' : 'fail');
    9 A  w4 ^' C# p% @9 n" Q% Z0 [1 K! J6 d
  26.     };
    0 x7 L4 F7 y8 }7 B9 Z  q
  27.     // ## 执行监听 ##
    - F$ A, s' h1 o( G3 k7 o& d/ ~
  28.     $inner_text_worker->listen();) u7 M2 B' P! p& g7 j0 S
  29. };9 Q9 N- o6 l9 g0 T
  30. // 新增加一个属性,用来保存uid到connection的映射- j% M/ O) n4 w, Y/ m7 p
  31. $worker->uidConnections = array();
    ! {: j- R  L" F* v* V5 p
  32. // 当有客户端发来消息时执行的回调函数& q$ ]% P: h7 |! X5 n
  33. $worker->onMessage = function($connection, $data)
    ; v! D+ S3 ^- L: U
  34. {
    " U& k5 F) p: u$ Y
  35.     global $worker;
    6 ]0 s1 Q: i1 T- X
  36.     // 判断当前客户端是否已经验证,既是否设置了uid8 h, T3 U+ A( L& w; s9 r) _& P
  37.     if(!isset($connection->uid))
    1 u( U8 i4 d$ c6 e. ]
  38.     {8 }* E8 o9 ?: U5 O4 i* u
  39.        // 没验证的话把第一个包当做uid(这里为了方便演示,没做真正的验证)
    8 V) l9 ~% }1 n$ L: q& G& b
  40.        $connection->uid = $data;
    : _6 f7 L; c% T8 X3 h+ O2 Q1 ^: l
  41.        /* 保存uid到connection的映射,这样可以方便的通过uid查找connection,
    / l6 Y) c0 t/ c1 }9 l5 `8 l: a
  42.         * 实现针对特定uid推送数据
    + ]* w! L. @8 m* O$ E  p
  43.         */9 \; I9 s# b6 v: [
  44.        $worker->uidConnections[$connection->uid] = $connection;/ C4 H( p: ?5 p+ g  l. K; F2 w
  45.        return;& ]2 Q$ R6 ~+ k: p" h' h
  46.     }
    * M, U7 a1 R- J  Q  X+ j# q
  47. };, n( j+ O! S# b# h2 m
  48. * L5 l- E% n4 s( [( p
  49. // 当有客户端连接断开时
    # i- a4 t% g  P; k( @  G3 F
  50. $worker->onClose = function($connection)
    & X) }2 V3 s) S
  51. {) r  O9 L: I) K- G1 j4 p1 n
  52.     global $worker;6 y& v2 t+ u, L" |, n8 S
  53.     if(isset($connection->uid))+ l5 i: o( {- q
  54.     {
    & `# C7 \" l& ~' Y
  55.         // 连接断开时删除映射
    , _$ W4 S% i* W) y
  56.         unset($worker->uidConnections[$connection->uid]);, Z. b3 K1 G, w- l! c
  57.     }# K9 R! ]5 W6 w, A: b# }! W% V
  58. };; ]; `5 v( _) r; n: W+ j
  59. & f( D6 r; ?' N2 k" a
  60. // 向所有验证的用户推送数据; |- h: U" ^+ m$ f
  61. function broadcast($message)
    # k+ V4 U1 H  D  [2 N: L
  62. {
      P+ R9 q" ~0 O: k* ]4 D5 m
  63.    global $worker;
      }4 g( y9 W& S0 k
  64.    foreach($worker->uidConnections as $connection)9 b. [  Y) Z& S8 P, N" ^
  65.    {9 X) V4 ^1 Q+ q
  66.         $connection->send($message);
    4 \/ Z/ ^* {5 Y) K
  67.    }
    / X4 g& Q  X' _- p+ ?
  68. }
    3 ~' J5 R$ c, D4 v6 v7 I, V

  69. / T& d* R; M1 @$ F8 P
  70. // 针对uid推送数据3 w+ i# @6 K0 {- A# S: o) [
  71. function sendMessageByUid($uid, $message)
    % x4 e7 g# K. r4 W+ p  F& V8 @
  72. {/ \& {3 \7 ~% U2 B+ n
  73.     global $worker;
    % X9 w' F% k6 v8 W8 R
  74.     if(isset($worker->uidConnections[$uid]))- b# b' x& x/ s) _
  75.     {% K! t$ Y( K) G1 z1 [
  76.         $connection = $worker->uidConnections[$uid];
    - z" P' a' |0 N/ _
  77.         $connection->send($message);: k9 y% d$ j% r. _
  78.         return true;
    4 N! y: s% k& e$ a' x# s  o
  79.     }
      K+ }9 L( P3 V$ `5 T
  80.     return false;1 f' a- a* e' j8 {- ?
  81. }$ Y2 R9 g9 R$ L
  82. ( Z. E2 C. |) r' y
  83. // 运行所有的worker
    . I+ c/ B9 _# u8 b. O9 D& s
  84. Worker::runAll();
复制代码
启动后端服务 php push.php start -d
前端接收推送的js代码
  1. var ws = new WebSocket('ws://127.0.0.1:1234');! q- m6 _7 L# V4 Z8 x; B
  2. ws.onopen = function(){
    - {8 I  g. u  {4 v! r0 M( R
  3.     var uid = 'uid1';
    # m  w* J5 T6 _/ ]; O0 A3 n
  4.     ws.send(uid);! x  G$ p2 _9 [3 i7 R; y
  5. };
    & Q- {3 `  q: C5 P7 w) N, N
  6. ws.onmessage = function(e){
      ], A4 `; P& }+ ^3 j! g
  7.     alert(e.data);9 H  J( r& U6 e# `) r
  8. };
复制代码
后端推送消息的代码
  1. // 建立socket连接到内部推送端口
    - ~  X$ L  [7 o% i5 L
  2. $client = stream_socket_client('tcp://127.0.0.1:5678', $errno, $errmsg, 1);
    , D$ W0 |7 a4 u$ q+ \, ~! ]& r% ~$ B
  3. // 推送的数据,包含uid字段,表示是给这个uid推送% a4 {, F0 J+ L' l+ _
  4. $data = array('uid'=>'uid1', 'percent'=>'88%');
    # u, C9 b% Z5 U
  5. // 发送数据,注意5678端口是Text协议的端口,Text协议需要在数据末尾加上换行符
    + G9 x. _# h. b& I9 b
  6. fwrite($client, json_encode($data)."\n");! V* Y5 R+ Y) j, I6 M/ t, @
  7. // 读取推送结果5 j4 F! T7 X$ K" Q$ C* }+ {
  8. echo fread($client, 8192);
复制代码

* f/ q0 Z9 F- }6 ]3 T6 w) {7 E; t, I: g3 a
分享到:  QQ好友和群QQ好友和群 QQ空间QQ空间 腾讯微博腾讯微博 腾讯朋友腾讯朋友
收藏收藏 分享分享 支持支持 反对反对
您需要登录后才可以回帖 登录 | 立即注册

本版积分规则

GMT+8, 2026-3-17 21:31 , Processed in 0.051584 second(s), 19 queries .

Copyright © 2001-2026 Powered by cncml! X3.2. Theme By cncml!